You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:53:42 UTC
[rocketmq] 01/26: [ISSUE #5406] Add processor for remoting messaging module
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 934f5b657639aede31a70aa902b25207ba16d179
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Thu Oct 27 16:00:13 2022 +0800
[ISSUE #5406] Add processor for remoting messaging module
---
.../common/attribute/TopicMessageType.java | 18 +++
.../apache/rocketmq/proxy/config/ProxyConfig.java | 30 ++++
.../proxy/processor/AbstractProcessor.java | 19 ---
.../proxy/processor/DefaultMessagingProcessor.java | 14 ++
.../proxy/processor/MessagingProcessor.java | 6 +
.../proxy/processor/ProducerProcessor.java | 2 +-
.../proxy/processor/RequestBrokerProcessor.java | 39 +++++
.../activity/AbstractRemotingActivity.java | 166 +++++++++++++++++++++
.../remoting/activity/AckMessageActivity.java | 38 +++++
.../activity/ChangeInvisibleTimeActivity.java | 38 +++++
.../remoting/activity/ConsumerManagerActivity.java | 112 ++++++++++++++
.../remoting/activity/GetTopicRouteActivity.java | 71 +++++++++
.../remoting/activity/PopMessageActivity.java | 41 +++++
.../remoting/activity/PullMessageActivity.java | 72 +++++++++
.../remoting/activity/SendMessageActivity.java | 96 ++++++++++++
.../proxy/remoting/pipeline/RequestPipeline.java | 34 ++---
.../proxy/service/channel/SimpleChannel.java | 5 +
.../service/message/ClusterMessageService.java | 37 ++++-
.../proxy/service/message/LocalMessageService.java | 12 ++
.../proxy/service/message/MessageService.java | 6 +
.../activity/AbstractRemotingActivityTest.java | 84 +++++++++++
21 files changed, 892 insertions(+), 48 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
index 8c484da31..5e6629e3b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
@@ -18,7 +18,9 @@
package org.apache.rocketmq.common.attribute;
import com.google.common.collect.Sets;
+import java.util.Map;
import java.util.Set;
+import org.apache.rocketmq.common.message.MessageConst;
public enum TopicMessageType {
UNSPECIFIED("UNSPECIFIED"),
@@ -40,6 +42,22 @@ public enum TopicMessageType {
return value;
}
+ public static TopicMessageType parseFromMessageProperty(Map<String, String> messageProperty) {
+ String isTrans = messageProperty.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+ String isTransValue = "true";
+ if (isTransValue.equals(isTrans)) {
+ return TopicMessageType.TRANSACTION;
+ } else if (messageProperty.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null
+ || messageProperty.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null
+ || messageProperty.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
+ return TopicMessageType.DELAY;
+ } else if (messageProperty.get(MessageConst.PROPERTY_SHARDING_KEY) != null) {
+ return TopicMessageType.FIFO;
+ } else {
+ return TopicMessageType.NORMAL;
+ }
+ }
+
public String getMetricsValue() {
return value.toLowerCase();
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 6bb488984..cbedc3c50 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -178,6 +178,12 @@ public class ProxyConfig implements ConfigFile {
// Example address: 127.0.0.1:1234
private String metricCollectorAddress = "";
+ private String regionId = "";
+
+ private boolean traceOn = false;
+
+ private String remotingAccessPoint = "";
+
private BrokerConfig.MetricsExporterType metricsExporterType = BrokerConfig.MetricsExporterType.DISABLE;
private String metricsGrpcExporterTarget = "";
@@ -961,6 +967,30 @@ public class ProxyConfig implements ConfigFile {
this.grpcClientIdleTimeMills = grpcClientIdleTimeMills;
}
+ public String getRegionId() {
+ return regionId;
+ }
+
+ public void setRegionId(String regionId) {
+ this.regionId = regionId;
+ }
+
+ public boolean isTraceOn() {
+ return traceOn;
+ }
+
+ public void setTraceOn(boolean traceOn) {
+ this.traceOn = traceOn;
+ }
+
+ public String getRemotingAccessPoint() {
+ return remotingAccessPoint;
+ }
+
+ public void setRemotingAccessPoint(String remotingAccessPoint) {
+ this.remotingAccessPoint = remotingAccessPoint;
+ }
+
public BrokerConfig.MetricsExporterType getMetricsExporterType() {
return metricsExporterType;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
index c223eb478..679cc4b3d 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
@@ -16,10 +16,7 @@
*/
package org.apache.rocketmq.proxy.processor;
-import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
@@ -41,20 +38,4 @@ public abstract class AbstractProcessor extends AbstractStartAndShutdown {
throw new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is expired");
}
}
-
- protected TopicMessageType parseFromMessageExt(Message message) {
- String isTrans = message.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- String isTransValue = "true";
- if (isTransValue.equals(isTrans)) {
- return TopicMessageType.TRANSACTION;
- } else if (message.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null
- || message.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null
- || message.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
- return TopicMessageType.DELAY;
- } else if (message.getProperty(MessageConst.PROPERTY_SHARDING_KEY) != null) {
- return TopicMessageType.FIFO;
- } else {
- return TopicMessageType.NORMAL;
- }
- }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 95fba895a..1b7baba0a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -60,6 +60,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
protected ConsumerProcessor consumerProcessor;
protected TransactionProcessor transactionProcessor;
protected ClientProcessor clientProcessor;
+ protected RequestBrokerProcessor requestBrokerProcessor;
protected ThreadPoolExecutor producerProcessorExecutor;
protected ThreadPoolExecutor consumerProcessorExecutor;
@@ -88,6 +89,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
this.consumerProcessor = new ConsumerProcessor(this, serviceManager, this.consumerProcessorExecutor);
this.transactionProcessor = new TransactionProcessor(this, serviceManager);
this.clientProcessor = new ClientProcessor(this, serviceManager);
+ this.requestBrokerProcessor = new RequestBrokerProcessor(this, serviceManager);
this.init();
}
@@ -218,6 +220,18 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
return this.consumerProcessor.getMinOffset(ctx, messageQueue, timeoutMillis);
}
+ @Override
+ public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis) {
+ return this.requestBrokerProcessor.request(ctx, brokerName, request, timeoutMillis);
+ }
+
+ @Override
+ public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis) {
+ return this.requestBrokerProcessor.requestOneway(ctx, brokerName, request, timeoutMillis);
+ }
+
@Override
public void registerProducer(ProxyContext ctx, String producerGroup, ClientChannelInfo clientChannelInfo) {
this.clientProcessor.registerProducer(ctx, producerGroup, clientChannelInfo);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index 89be595ec..3e8b8084e 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -230,6 +230,12 @@ public interface MessagingProcessor extends StartAndShutdown {
long timeoutMillis
);
+ CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis);
+
+ CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis);
+
void registerProducer(
ProxyContext ctx,
String producerGroup,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
index 95bd0e5fe..2fce78d31 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
@@ -73,7 +73,7 @@ public class ProducerProcessor extends AbstractProcessor {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
TopicMessageType topicMessageType = serviceManager.getMetadataService().getTopicMessageType(topic);
- TopicMessageType messageType = parseFromMessageExt(message);
+ TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(message.getProperties());
topicMessageTypeValidator.validate(topicMessageType, messageType);
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/RequestBrokerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/RequestBrokerProcessor.java
new file mode 100644
index 000000000..9f3187cde
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/RequestBrokerProcessor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.service.ServiceManager;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class RequestBrokerProcessor extends AbstractProcessor {
+
+ public RequestBrokerProcessor(MessagingProcessor messagingProcessor,
+ ServiceManager serviceManager) {
+ super(messagingProcessor, serviceManager);
+ }
+
+ CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) {
+ return serviceManager.getMessageService().request(ctx, brokerName, request, timeoutMillis);
+ }
+
+ CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request, long timeoutMillis) {
+ return serviceManager.getMessageService().requestOneway(ctx, brokerName, request, timeoutMillis);
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
new file mode 100644
index 000000000..7f0d891ec
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractRemotingActivity implements NettyRequestProcessor {
+ protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ protected final MessagingProcessor messagingProcessor;
+ protected static final String BROKER_NAME_FIELD = "bname";
+ private static final Map<ProxyExceptionCode, Integer> PROXY_EXCEPTION_RESPONSE_CODE_MAP = new HashMap<ProxyExceptionCode, Integer>() {
+ {
+ put(ProxyExceptionCode.FORBIDDEN, ResponseCode.NO_PERMISSION);
+ put(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, ResponseCode.MESSAGE_ILLEGAL);
+ put(ProxyExceptionCode.INTERNAL_SERVER_ERROR, ResponseCode.SYSTEM_ERROR);
+ put(ProxyExceptionCode.TRANSACTION_DATA_NOT_FOUND, ResponseCode.SUCCESS);
+ }
+ };
+ protected final RequestPipeline requestPipeline;
+
+ public AbstractRemotingActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor) {
+ this.requestPipeline = requestPipeline;
+ this.messagingProcessor = messagingProcessor;
+ }
+
+ protected RemotingCommand request(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context, long timeoutMillis) throws Exception {
+ if (request.getExtFields().get(BROKER_NAME_FIELD) == null) {
+ return RemotingCommand.buildErrorResponse(ResponseCode.VERSION_NOT_SUPPORTED,
+ "Request doesn't have field bname");
+ }
+ String brokerName = request.getExtFields().get(BROKER_NAME_FIELD);
+ if (request.isOnewayRPC()) {
+ return null;
+ }
+ messagingProcessor.request(context, brokerName, request, timeoutMillis)
+ .thenAccept(r -> writeResponse(ctx, context, request, r))
+ .exceptionally(t -> {
+ writeErrResponse(ctx, context, request, t);
+ return null;
+ });
+ return null;
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+ ProxyContext context = createContext(ctx, request);
+ try {
+ this.requestPipeline.execute(ctx, request, context);
+ RemotingCommand response = this.processRequest0(ctx, request, context);
+ if (response != null) {
+ writeResponse(ctx, context, request, response);
+ }
+ return null;
+ } catch (Throwable t) {
+ writeErrResponse(ctx, context, request, t);
+ return null;
+ }
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
+ protected abstract RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception;
+
+ protected ProxyContext createContext(ChannelHandlerContext ctx, RemotingCommand request) {
+ ProxyContext context = ProxyContext.create();
+ context.setAction("Remoting" + request.getCode())
+ .setLanguage(request.getLanguage().name())
+ .setChannel(ctx.channel())
+ .setLocalAddress(RemotingUtil.socketAddress2String(ctx.channel().localAddress()))
+ .setRemoteAddress(RemotingUtil.socketAddress2String(ctx.channel().remoteAddress()));
+
+ return context;
+ }
+
+ protected void writeErrResponse(ChannelHandlerContext ctx, final ProxyContext context,
+ final RemotingCommand request, Throwable t) {
+ t = ExceptionUtils.getRealException(t);
+ if (t instanceof ProxyException) {
+ ProxyException e = (ProxyException) t;
+ writeResponse(ctx, context, request,
+ RemotingCommand.createResponseCommand(
+ PROXY_EXCEPTION_RESPONSE_CODE_MAP.getOrDefault(e.getCode(), ResponseCode.SYSTEM_ERROR),
+ e.getMessage()),
+ t);
+ } else if (t instanceof MQClientException) {
+ MQClientException e = (MQClientException) t;
+ writeResponse(ctx, context, request, RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage()), t);
+ } else if (t instanceof MQBrokerException) {
+ MQBrokerException e = (MQBrokerException) t;
+ writeResponse(ctx, context, request, RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage()), t);
+ } else if (t instanceof AclException) {
+ writeResponse(ctx, context, request, RemotingCommand.createResponseCommand(ResponseCode.NO_PERMISSION, t.getMessage()), t);
+ } else {
+ writeResponse(ctx, context, request,
+ RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, t.getMessage()), t);
+ }
+ }
+
+ protected void writeResponse(ChannelHandlerContext ctx, final ProxyContext context,
+ final RemotingCommand request, RemotingCommand response) {
+ writeResponse(ctx, context, request, response, null);
+ }
+
+ protected void writeResponse(ChannelHandlerContext ctx, final ProxyContext context,
+ final RemotingCommand request, RemotingCommand response, Throwable t) {
+ if (request.isOnewayRPC()) {
+ return;
+ }
+ if (!ctx.channel().isWritable()) {
+ return;
+ }
+
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ response.addExtField(MessageConst.PROPERTY_MSG_REGION, config.getRegionId());
+ response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(config.isTraceOn()));
+ if (t != null) {
+ response.setRemark(t.getMessage());
+ }
+
+ ctx.writeAndFlush(response);
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AckMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AckMessageActivity.java
new file mode 100644
index 000000000..723b5918b
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AckMessageActivity.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class AckMessageActivity extends AbstractRemotingActivity {
+ public AckMessageActivity(RequestPipeline requestPipeline,
+ MessagingProcessor messagingProcessor) {
+ super(requestPipeline, messagingProcessor);
+ }
+
+ @Override
+ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ChangeInvisibleTimeActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ChangeInvisibleTimeActivity.java
new file mode 100644
index 000000000..9f6de99e0
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ChangeInvisibleTimeActivity.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class ChangeInvisibleTimeActivity extends AbstractRemotingActivity {
+ public ChangeInvisibleTimeActivity(RequestPipeline requestPipeline,
+ MessagingProcessor messagingProcessor) {
+ super(requestPipeline, messagingProcessor);
+ }
+
+ @Override
+ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
new file mode 100644
index 000000000..fb248a894
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Set;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class ConsumerManagerActivity extends AbstractRemotingActivity {
+ public ConsumerManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor) {
+ super(requestPipeline, messagingProcessor);
+ }
+
+ @Override
+ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ switch (request.getCode()) {
+ case RequestCode.GET_CONSUMER_LIST_BY_GROUP: {
+ return getConsumerListByGroup(ctx, request, context);
+ }
+ case RequestCode.LOCK_BATCH_MQ: {
+ return lockBatchMQ(ctx, request, context);
+ }
+ case RequestCode.UNLOCK_BATCH_MQ: {
+ return unlockBatchMQ(ctx, request, context);
+ }
+ case RequestCode.UPDATE_CONSUMER_OFFSET:
+ case RequestCode.QUERY_CONSUMER_OFFSET:
+ case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
+ case RequestCode.GET_MIN_OFFSET:
+ case RequestCode.GET_MAX_OFFSET:
+ case RequestCode.GET_EARLIEST_MSG_STORETIME: {
+ return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
+ }
+ default:
+ break;
+ }
+ return null;
+ }
+
+ protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ // TODO after connection-related module
+ return null;
+ }
+
+ protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
+ Set<MessageQueue> mqSet = requestBody.getMqSet();
+ if (mqSet.isEmpty()) {
+ response.setBody(requestBody.encode());
+ response.setRemark("MessageQueue set is empty");
+ return response;
+ }
+
+ String brokerName = new ArrayList<>(mqSet).get(0).getBrokerName();
+ messagingProcessor.request(context, brokerName, request, Duration.ofSeconds(3).toMillis())
+ .thenAccept(r -> writeResponse(ctx, context, request, r))
+ .exceptionally(t -> {
+ writeErrResponse(ctx, context, request, t);
+ return null;
+ });
+ return null;
+ }
+
+ protected RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
+ Set<MessageQueue> mqSet = requestBody.getMqSet();
+ if (mqSet.isEmpty()) {
+ response.setBody(requestBody.encode());
+ response.setRemark("MessageQueue set is empty");
+ return response;
+ }
+
+ String brokerName = new ArrayList<>(mqSet).get(0).getBrokerName();
+ messagingProcessor.request(context, brokerName, request, Duration.ofSeconds(3).toMillis())
+ .thenAccept(r -> writeResponse(ctx, context, request, r))
+ .exceptionally(t -> {
+ writeErrResponse(ctx, context, request, t);
+ return null;
+ });
+ return null;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
new file mode 100644
index 000000000..d3b7de98d
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.google.common.net.HostAndPort;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class GetTopicRouteActivity extends AbstractRemotingActivity {
+ public GetTopicRouteActivity(RequestPipeline requestPipeline,
+ MessagingProcessor messagingProcessor) {
+ super(requestPipeline, messagingProcessor);
+ }
+
+ @Override
+ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final GetRouteInfoRequestHeader requestHeader =
+ (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
+ List<Address> addressList = new ArrayList<>();
+ addressList.add(new Address(Address.AddressScheme.IPv4, HostAndPort.fromString(proxyConfig.getRemotingAccessPoint())));
+ ProxyTopicRouteData proxyTopicRouteData = messagingProcessor.getTopicRouteDataForProxy(context, addressList, requestHeader.getTopic());
+ TopicRouteData topicRouteData = proxyTopicRouteData.buildTopicRouteData();
+
+ byte[] content;
+ Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly();
+ if (request.getVersion() >= MQVersion.Version.V4_9_4.ordinal() || null != standardJsonOnly && standardJsonOnly) {
+ content = topicRouteData.encode(SerializerFeature.BrowserCompatible,
+ SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
+ SerializerFeature.MapSortField);
+ } else {
+ content = topicRouteData.encode();
+ }
+
+ response.setBody(content);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java
new file mode 100644
index 000000000..d52b84b12
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class PopMessageActivity extends AbstractRemotingActivity {
+ public PopMessageActivity(RequestPipeline requestPipeline,
+ MessagingProcessor messagingProcessor) {
+ super(requestPipeline, messagingProcessor);
+ }
+
+ @Override
+ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ PopMessageRequestHeader popMessageRequestHeader = (PopMessageRequestHeader) request.decodeCommandCustomHeader(PopMessageRequestHeader.class);
+ long timeoutMillis = popMessageRequestHeader.getPollTime();
+ return request(ctx, request, context, timeoutMillis + Duration.ofSeconds(10).toMillis());
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
new file mode 100644
index 000000000..819bf139d
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class PullMessageActivity extends AbstractRemotingActivity {
+ public PullMessageActivity(RequestPipeline requestPipeline,
+ MessagingProcessor messagingProcessor) {
+ super(requestPipeline, messagingProcessor);
+ }
+
+ @Override
+ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ if (request.getExtFields().get(BROKER_NAME_FIELD) == null) {
+ return RemotingCommand.buildErrorResponse(ResponseCode.VERSION_NOT_SUPPORTED,
+ "Request doesn't have field bname");
+ }
+ PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+ if (!PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag())) {
+ ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup());
+ if (consumerInfo == null) {
+ return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_LATEST,
+ "the consumer's subscription not latest");
+ }
+ SubscriptionData subscriptionData = consumerInfo.findSubscriptionData(requestHeader.getTopic());
+ if (subscriptionData == null) {
+ return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_EXIST,
+ "the consumer's subscription not exist");
+ }
+ requestHeader.setSubscription(subscriptionData.getSubString());
+ requestHeader.setExpressionType(subscriptionData.getExpressionType());
+ request.makeCustomHeaderToNet();
+ }
+ String brokerName = requestHeader.getBname();
+ long timeoutMillis = requestHeader.getSuspendTimeoutMillis() + Duration.ofSeconds(10).toMillis();
+ CompletableFuture<RemotingCommand> future = messagingProcessor.request(context, brokerName, request, timeoutMillis);
+ future.thenAccept(r -> writeResponse(ctx, context, request, r))
+ .exceptionally(t -> {
+ writeErrResponse(ctx, context, request, t);
+ return null;
+ });
+ return null;
+ }
+}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
new file mode 100644
index 000000000..904460431
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
+import java.util.Map;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.processor.validator.DefaultTopicMessageTypeValidator;
+import org.apache.rocketmq.proxy.processor.validator.TopicMessageTypeValidator;
+import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class SendMessageActivity extends AbstractRemotingActivity {
+ TopicMessageTypeValidator topicMessageTypeValidator;
+
+ public SendMessageActivity(RequestPipeline requestPipeline,
+ MessagingProcessor messagingProcessor) {
+ super(requestPipeline, messagingProcessor);
+ this.topicMessageTypeValidator = new DefaultTopicMessageTypeValidator();
+ }
+
+ @Override
+ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ switch (request.getCode()) {
+ case RequestCode.SEND_MESSAGE:
+ case RequestCode.SEND_MESSAGE_V2:
+ case RequestCode.SEND_BATCH_MESSAGE: {
+ return sendMessage(ctx, request, context);
+ }
+ case RequestCode.CONSUMER_SEND_MSG_BACK: {
+ return consumerSendMessage(ctx, request, context);
+ }
+ default:
+ break;
+ }
+ return null;
+ }
+
+ protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ SendMessageRequestHeader requestHeader = SendMessageRequestHeader.parseRequestHeader(request);
+ String topic = requestHeader.getTopic();
+ Map<String, String> property = MessageDecoder.string2messageProperties(requestHeader.getProperties());
+ TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(property);
+ if (ConfigurationManager.getProxyConfig().isEnableTopicMessageTypeCheck()) {
+ if (topicMessageTypeValidator != null) {
+ // Do not check retry or dlq topic
+ if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
+ TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(topic);
+ topicMessageTypeValidator.validate(topicMessageType, messageType);
+ }
+ }
+ }
+ if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
+ if (TopicMessageType.TRANSACTION.equals(messageType)) {
+ return sendTransactionMessage(ctx, request, context);
+ }
+ }
+ return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
+ }
+
+ protected RemotingCommand consumerSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ return request(ctx, request, context, Duration.ofSeconds(3).toMillis());
+ }
+
+ protected RemotingCommand sendTransactionMessage(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ // TODO: wait for connection implement.
+ return null;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/RequestPipeline.java
similarity index 54%
copy from common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
copy to proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/RequestPipeline.java
index 8c484da31..4c46a6e7d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/RequestPipeline.java
@@ -15,32 +15,20 @@
* limitations under the License.
*/
-package org.apache.rocketmq.common.attribute;
+package org.apache.rocketmq.proxy.remoting.pipeline;
-import com.google.common.collect.Sets;
-import java.util.Set;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-public enum TopicMessageType {
- UNSPECIFIED("UNSPECIFIED"),
- NORMAL("NORMAL"),
- FIFO("FIFO"),
- DELAY("DELAY"),
- TRANSACTION("TRANSACTION");
+public interface RequestPipeline {
- private final String value;
- TopicMessageType(String value) {
- this.value = value;
- }
-
- public static Set<String> topicMessageTypeSet() {
- return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value);
- }
-
- public String getValue() {
- return value;
- }
+ void execute(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception;
- public String getMetricsValue() {
- return value.toLowerCase();
+ default RequestPipeline pipe(RequestPipeline source) {
+ return (ctx, request, context) -> {
+ source.execute(ctx, request, context);
+ execute(ctx, request, context);
+ };
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
index ff7ef01a0..04ad5e269 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/SimpleChannel.java
@@ -175,6 +175,11 @@ public class SimpleChannel extends AbstractChannel {
return promise;
}
+ @Override
+ public boolean isWritable() {
+ return true;
+ }
+
public void updateLastAccessTime() {
this.lastAccessTime = System.currentTimeMillis();
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
index f27f002d0..c2a5a6435 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
@@ -63,13 +63,13 @@ public class ClusterMessageService implements MessageService {
CompletableFuture<List<SendResult>> future;
if (msgList.size() == 1) {
future = this.mqClientAPIFactory.getClient().sendMessageAsync(
- messageQueue.getBrokerAddr(),
- messageQueue.getBrokerName(), msgList.get(0), requestHeader, timeoutMillis)
+ messageQueue.getBrokerAddr(),
+ messageQueue.getBrokerName(), msgList.get(0), requestHeader, timeoutMillis)
.thenApply(Lists::newArrayList);
} else {
future = this.mqClientAPIFactory.getClient().sendMessageAsync(
- messageQueue.getBrokerAddr(),
- messageQueue.getBrokerName(), msgList, requestHeader, timeoutMillis)
+ messageQueue.getBrokerAddr(),
+ messageQueue.getBrokerName(), msgList, requestHeader, timeoutMillis)
.thenApply(Lists::newArrayList);
}
return future;
@@ -86,7 +86,8 @@ public class ClusterMessageService implements MessageService {
}
@Override
- public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String brokerName, EndTransactionRequestHeader requestHeader,
+ public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String brokerName,
+ EndTransactionRequestHeader requestHeader,
long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
@@ -205,6 +206,32 @@ public class ClusterMessageService implements MessageService {
);
}
+ @Override
+ public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis) {
+ try {
+ String brokerAddress = topicRouteService.getBrokerAddr(brokerName);
+ return mqClientAPIFactory.getClient().invoke(brokerAddress, request, timeoutMillis);
+ } catch (Exception e) {
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis) {
+ try {
+ String brokerAddress = topicRouteService.getBrokerAddr(brokerName);
+ return mqClientAPIFactory.getClient().invokeOneway(brokerAddress, request, timeoutMillis);
+ } catch (Exception e) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+ }
+
protected String resolveBrokerAddrInReceiptHandle(ReceiptHandle handle) {
try {
return this.topicRouteService.getBrokerAddr(handle.getBrokerName());
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index 491926d01..115c140ff 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -402,4 +402,16 @@ public class LocalMessageService implements MessageService {
GetMinOffsetRequestHeader requestHeader, long timeoutMillis) {
throw new NotImplementedException("getMinOffset is not implemented in LocalMessageService");
}
+
+ @Override
+ public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis) {
+ throw new NotImplementedException("request is not implemented in LocalMessageService");
+ }
+
+ @Override
+ public CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis) {
+ throw new NotImplementedException("requestOneway is not implemented in LocalMessageService");
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
index 18673b505..15da17154 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
@@ -139,4 +139,10 @@ public interface MessageService {
GetMinOffsetRequestHeader requestHeader,
long timeoutMillis
);
+
+ CompletableFuture<RemotingCommand> request(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis);
+
+ CompletableFuture<Void> requestOneway(ProxyContext ctx, String brokerName, RemotingCommand request,
+ long timeoutMillis);
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
new file mode 100644
index 000000000..b581d8a91
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.activity;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AbstractRemotingActivityTest extends InitConfigAndLoggerTest {
+ AbstractRemotingActivity remotingActivity;
+ @Mock
+ MessagingProcessor messagingProcessorMock;
+ @Spy
+ ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) {
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+ return null;
+ }
+ };
+
+ @Before
+ public void setup() {
+ remotingActivity = new AbstractRemotingActivity(null, messagingProcessorMock) {
+ @Override
+ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ return null;
+ }
+ };
+ }
+
+ @Test
+ public void request() throws Exception {
+ String brokerName = "broker";
+ String remark = "success";
+ when(messagingProcessorMock.request(any(), anyString(), any(), anyLong())).thenReturn(CompletableFuture.completedFuture(
+ RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, remark)
+ ));
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+ request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName);
+ RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000);
+ assertThat(remotingCommand).isNull();
+ verify(ctx, times(1)).writeAndFlush(any());
+ }
+}
\ No newline at end of file