You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/20 03:50:13 UTC

[rocketmq] 01/03: [ISSUE #3906] Mark stream-related request by RequestType

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 34eb2ce7d17caefea63132a4e30bc425594abddd
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue Apr 19 11:16:31 2022 +0800

    [ISSUE #3906] Mark stream-related request by RequestType
---
 .../org/apache/rocketmq/client/ClientConfig.java   | 25 ++++++++++++-
 .../client/consumer/DefaultLitePullConsumer.java   |  1 +
 .../client/consumer/DefaultMQPullConsumer.java     |  1 +
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  7 +++-
 .../java/org/apache/rocketmq/common/MixAll.java    |  1 +
 .../rocketmq/common/rpchook/StreamTypeRPCHook.java | 34 ++++++++++++++++++
 .../rocketmq/remoting/protocol/RequestType.java    | 41 ++++++++++++++++++++++
 7 files changed, 108 insertions(+), 2 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 4452bbdfa..eeb882673 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.utils.NameServerAddressUtils;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RequestType;
 
 /**
  * Client Common configuration
@@ -65,6 +66,12 @@ public class ClientConfig {
 
     private LanguageCode language = LanguageCode.JAVA;
 
+    /**
+     * Enable stream request type will inject a RPCHook to add corresponding request type to remoting layer.
+     * And it will also generate a different client id to prevent unexpected reuses of MQClientInstance.
+     */
+    protected boolean enableStreamRequestType = false;
+
     public String buildMQClientId() {
         StringBuilder sb = new StringBuilder();
         sb.append(this.getClientIP());
@@ -76,6 +83,11 @@ public class ClientConfig {
             sb.append(this.unitName);
         }
 
+        if (enableStreamRequestType) {
+            sb.append("@");
+            sb.append(RequestType.STREAM);
+        }
+
         return sb.toString();
     }
 
@@ -160,6 +172,7 @@ public class ClientConfig {
         this.namespace = cc.namespace;
         this.language = cc.language;
         this.mqClientApiTimeout = cc.mqClientApiTimeout;
+        this.enableStreamRequestType = cc.enableStreamRequestType;
     }
 
     public ClientConfig cloneClientConfig() {
@@ -179,6 +192,7 @@ public class ClientConfig {
         cc.namespace = namespace;
         cc.language = language;
         cc.mqClientApiTimeout = mqClientApiTimeout;
+        cc.enableStreamRequestType = enableStreamRequestType;
         return cc;
     }
 
@@ -318,12 +332,21 @@ public class ClientConfig {
         this.mqClientApiTimeout = mqClientApiTimeout;
     }
 
+    public boolean isEnableStreamRequestType() {
+        return enableStreamRequestType;
+    }
+
+    public void setEnableStreamRequestType(boolean enableStreamRequestType) {
+        this.enableStreamRequestType = enableStreamRequestType;
+    }
+
     @Override
     public String toString() {
         return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
             + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
             + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
             + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
-            + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout + "]";
+            + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + ", mqClientApiTimeout=" + mqClientApiTimeout
+            + ", enableStreamRequestType=" + enableStreamRequestType + "]";
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 8c7f0f0b3..7799166f2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -220,6 +220,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
     public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.consumerGroup = consumerGroup;
+        this.enableStreamRequestType = true;
         defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 4784e72e1..5e2138e81 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -117,6 +117,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
     public DefaultMQPullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.consumerGroup = consumerGroup;
+        this.enableStreamRequestType = true;
         defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 245195f07..5d7e1685e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.impl;
 
+import com.alibaba.fastjson.JSON;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -152,6 +153,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerR
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rpchook.StreamTypeRPCHook;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.InvokeCallback;
@@ -169,7 +171,6 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-import com.alibaba.fastjson.JSON;
 
 public class MQClientAPIImpl {
 
@@ -195,6 +196,10 @@ public class MQClientAPIImpl {
         this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
         this.clientRemotingProcessor = clientRemotingProcessor;
 
+        // Inject stream rpc hook first to make reserve field signature
+        if (clientConfig.isEnableStreamRequestType()) {
+            this.remotingClient.registerRPCHook(new StreamTypeRPCHook());
+        }
         this.remotingClient.registerRPCHook(rpcHook);
         this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index cf28f2e22..b13a09f92 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -89,6 +89,7 @@ public class MixAll {
     public static final String REPLY_MESSAGE_FLAG = "reply";
     public static final String LMQ_PREFIX = "%LMQ%";
     public static final String MULTI_DISPATCH_QUEUE_SPLITTER = ",";
+    public static final String REQ_T = "ReqT";
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
     public static String getWSAddr() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java b/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java
new file mode 100644
index 000000000..7a74bc52f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rpchook/StreamTypeRPCHook.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.rpchook;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestType;
+
+public class StreamTypeRPCHook implements RPCHook {
+    @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+        request.addExtField(MixAll.REQ_T, String.valueOf(RequestType.STREAM.getCode()));
+    }
+
+    @Override public void doAfterResponse(String remoteAddr, RemotingCommand request,
+        RemotingCommand response) {
+
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java
new file mode 100644
index 000000000..65217d5b8
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol;
+
+public enum RequestType {
+    STREAM((byte) 0);
+
+    private final byte code;
+
+    RequestType(byte code) {
+        this.code = code;
+    }
+
+    public static RequestType valueOf(byte code) {
+        for (RequestType requestType : RequestType.values()) {
+            if (requestType.getCode() == code) {
+                return requestType;
+            }
+        }
+        return null;
+    }
+
+    public byte getCode() {
+        return code;
+    }
+}