You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/20 03:50:13 UTC
[rocketmq] 01/03: [ISSUE #3906] Mark stream-related request by RequestType
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 34eb2ce7d17caefea63132a4e30bc425594abddd
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue Apr 19 11:16:31 2022 +0800
[ISSUE #3906] Mark stream-related request by RequestType
---
.../org/apache/rocketmq/client/ClientConfig.java | 25 ++++++++++++-
.../client/consumer/DefaultLitePullConsumer.java | 1 +
.../client/consumer/DefaultMQPullConsumer.java | 1 +
.../rocketmq/client/impl/MQClientAPIImpl.java | 7 +++-
.../java/org/apache/rocketmq/common/MixAll.java | 1 +
.../rocketmq/common/rpchook/StreamTypeRPCHook.java | 34 ++++++++++++++++++
.../rocketmq/remoting/protocol/RequestType.java | 41 ++++++++++++++++++++++
7 files changed, 108 insertions(+), 2 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 4452bbdfa..eeb882673 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.utils.NameServerAddressUtils;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RequestType;
/**
* Client Common configuration
@@ -65,6 +66,12 @@ public class ClientConfig {
private LanguageCode language = LanguageCode.JAVA;
+ /**
+ * Enable stream request type will inject a RPCHook to add corresponding request type to remoting layer.
+ * And it will also generate a different client id to prevent unexpected reuses of MQClientInstance.
+ */
+ protected boolean enableStreamRequestType = false;
+
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
@@ -76,6 +83,11 @@ public class ClientConfig {
sb.append(this.unitName);
}
+ if (enableStreamRequestType) {
+ sb.append("@");
+ sb.append(RequestType.STREAM);
+ }
+
return sb.toString();
}
@@ -160,6 +172,7 @@ public class ClientConfig {
this.namespace = cc.namespace;
this.language = cc.language;
this.mqClientApiTimeout = cc.mqClientApiTimeout;
+ this.enableStreamRequestType = cc.enableStreamRequestType;
}
public ClientConfig cloneClientConfig() {
@@ -179,6 +192,7 @@ public class ClientConfig {
cc.namespace = namespace;
cc.language = language;
cc.mqClientApiTimeout = mqClientApiTimeout;
+ cc.enableStreamRequestType = enableStreamRequestType;
return cc;
}
@@ -318,12 +332,21 @@ public class ClientConfig {
this.mqClientApiTimeout = mqClientApiTimeout;
}
+ public boolean isEnableStreamRequestType() {
+ return enableStreamRequestType;
+ }
+
+ public void setEnableStreamRequestType(boolean enableStreamRequestType) {
+ this.enableStreamRequestType = enableStreamRequestType;
+ }
+
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
+ ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
- + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + "]";
+ + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout
+ + ", enableStreamRequestType=" + enableStreamRequestType + "]";
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 8c7f0f0b3..7799166f2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -220,6 +220,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
+ this.enableStreamRequestType = true;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 4784e72e1..5e2138e81 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -117,6 +117,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
public DefaultMQPullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
+ this.enableStreamRequestType = true;
defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 245195f07..5d7e1685e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.impl;
+import com.alibaba.fastjson.JSON;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -152,6 +153,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerR
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rpchook.StreamTypeRPCHook;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.InvokeCallback;
@@ -169,7 +171,6 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-import com.alibaba.fastjson.JSON;
public class MQClientAPIImpl {
@@ -195,6 +196,10 @@ public class MQClientAPIImpl {
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
+ // Inject stream rpc hook first to make reserve field signature
+ if (clientConfig.isEnableStreamRequestType()) {
+ this.remotingClient.registerRPCHook(new StreamTypeRPCHook());
+ }
this.remotingClient.registerRPCHook(rpcHook);
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index cf28f2e22..b13a09f92 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -89,6 +89,7 @@ public class MixAll {
public static final String REPLY_MESSAGE_FLAG = "reply";
public static final String LMQ_PREFIX = "%LMQ%";
public static final String MULTI_DISPATCH_QUEUE_SPLITTER = ",";
+ public static final String REQ_T = "ReqT";
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static String getWSAddr() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java b/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java
new file mode 100644
index 000000000..7a74bc52f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.rpchook;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestType;
+
+public class StreamTypeRPCHook implements RPCHook {
+ @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+ request.addExtField(MixAll.REQ_T, String.valueOf(RequestType.STREAM.getCode()));
+ }
+
+ @Override public void doAfterResponse(String remoteAddr, RemotingCommand request,
+ RemotingCommand response) {
+
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java
new file mode 100644
index 000000000..65217d5b8
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol;
+
+public enum RequestType {
+ STREAM((byte) 0);
+
+ private final byte code;
+
+ RequestType(byte code) {
+ this.code = code;
+ }
+
+ public static RequestType valueOf(byte code) {
+ for (RequestType requestType : RequestType.values()) {
+ if (requestType.getCode() == code) {
+ return requestType;
+ }
+ }
+ return null;
+ }
+
+ public byte getCode() {
+ return code;
+ }
+}