You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2022/12/01 15:17:05 UTC

[GitHub] [camel] TeslaCN opened a new pull request, #8820: CAMEL-14831: Add camel-rocketmq component

TeslaCN opened a new pull request, #8820:
URL: https://github.com/apache/camel/pull/8820

   https://issues.apache.org/jira/browse/CAMEL-14831
   
   The RocketMQ component allows you to produce and consume messages from [RocketMQ](https://rocketmq.apache.org/) clusters.
   
   
   - [x] Make sure there is a [JIRA issue](https://issues.apache.org/jira/browse/CAMEL) filed for the change (usually before you start working on it).  Trivial changes like typos do not require a JIRA issue.  Your pull request should address just this issue, without pulling in other changes.
   - [x] Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Run `mvn clean install -Psourcecheck` in your module with source check enabled to make sure basic checks pass and there are no checkstyle violations. A more thorough check will be performed on your pull request automatically.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1334016508

   ### Components tested:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 2 | 2 | 0 | 1 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] essobedo commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
essobedo commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1335310743

   If you rebase against the main branch it should be good now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] zhfeng commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
zhfeng commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1340499074

   @TeslaCN I think it is reasonable since this is a new component. It should be good to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] TeslaCN commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1340699743

   Thank you all for helping reviewing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1337569859

   ### Components tested:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 2 | 2 | 0 | 1 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] TeslaCN commented on a diff in pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on code in PR #8820:
URL: https://github.com/apache/camel/pull/8820#discussion_r1038986808


##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/AclUtils.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public final class AclUtils {
+
+    private AclUtils() {
+    }
+
+    public static RPCHook getAclRPCHook(String accessKey, String secretKey) {
+        if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {

Review Comment:
   Actually I also want to avoid using null. But the constructors of RocketMQ classes accept null as default value. Passing null into RocketMQ's classes will not cause NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on a diff in pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
orpiske commented on code in PR #8820:
URL: https://github.com/apache/camel/pull/8820#discussion_r1039441141


##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/AclUtils.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public final class AclUtils {
+
+    private AclUtils() {
+    }
+
+    public static RPCHook getAclRPCHook(String accessKey, String secretKey) {
+        if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {

Review Comment:
   Great, in this case it should be fine. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] TeslaCN commented on a diff in pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on code in PR #8820:
URL: https://github.com/apache/camel/pull/8820#discussion_r1039664174


##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyTimeoutMap.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.support.DefaultTimeoutMap;
+
+public class ReplyTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
+
+    public ReplyTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+        super(executor, requestMapPollTimeMillis);
+        addListener(this::listener);
+    }
+
+    private static long encode(long timeoutMillis) {
+        return timeoutMillis > 0 ? timeoutMillis : Integer.MAX_VALUE;
+    }
+
+    private void listener(Listener.Type type, String key, ReplyHandler handler) {
+        switch (type) {
+            case Put:
+                log.trace("Add messageKey: {}", key);
+                break;
+            case Remove:
+                log.trace("Remove messageKey: {}", key);
+                break;
+            case Evict:
+                try {
+                    handler.onTimeout(key);
+                } catch (Throwable e) {
+                    log.warn("Error processing onTimeout for messageKey: " + key + " due: " + e.getLocalizedMessage()
+                             + ". This exception is ignored.",
+                            e);

Review Comment:
   I've added marker named `camel-rocketmq`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] TeslaCN commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1335244149

   The failures are weird.
   <img width="1350" alt="image" src="https://user-images.githubusercontent.com/20503072/205304540-475db5af-c689-4821-a611-86ceedce66bb.png">
   
   <img width="1680" alt="image" src="https://user-images.githubusercontent.com/20503072/205304642-90c7785f-d1ad-4e45-ba86-ae4266cc0e3b.png">


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1336451426

   ### Components tested:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 2 | 2 | 1 | 0 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] TeslaCN commented on a diff in pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on code in PR #8820:
URL: https://github.com/apache/camel/pull/8820#discussion_r1038986243


##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Suspendable;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQConsumer extends DefaultConsumer implements Suspendable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQConsumer.class);
+
+    private final RocketMQEndpoint endpoint;
+
+    private DefaultMQPushConsumer mqPushConsumer;
+
+    public RocketMQConsumer(RocketMQEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    private void startConsumer() throws MQClientException {
+        if (mqPushConsumer != null) {
+            LOG.warn("Overriding RocketMQ Consumer! {}", mqPushConsumer);

Review Comment:
   I was overthinking before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1337608056

   ### Components tested:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 2 | 2 | 0 | 1 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] TeslaCN commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1340491007

   The checkstyle GitHub Action always failed due to could not resolve dependencies. Did I missed something?
   
   ![Uploading image.png…]()
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] TeslaCN commented on a diff in pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on code in PR #8820:
URL: https://github.com/apache/camel/pull/8820#discussion_r1038985863


##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Suspendable;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQConsumer extends DefaultConsumer implements Suspendable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQConsumer.class);
+
+    private final RocketMQEndpoint endpoint;
+
+    private DefaultMQPushConsumer mqPushConsumer;
+
+    public RocketMQConsumer(RocketMQEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    private void startConsumer() throws MQClientException {
+        if (mqPushConsumer != null) {
+            LOG.warn("Overriding RocketMQ Consumer! {}", mqPushConsumer);
+        }
+        mqPushConsumer = new DefaultMQPushConsumer(
+                null, endpoint.getConsumerGroup(),
+                AclUtils.getAclRPCHook(getEndpoint().getAccessKey(), getEndpoint().getSecretKey()));
+        mqPushConsumer.setNamesrvAddr(endpoint.getNamesrvAddr());
+        mqPushConsumer.subscribe(endpoint.getTopicName(), endpoint.getSubscribeTags());
+        mqPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            MessageExt messageExt = msgs.get(0);
+            Exchange exchange = endpoint.createRocketExchange(messageExt.getBody());
+            new RocketMQMessageConverter().setExchangeHeadersByMessageExt(exchange, messageExt);

Review Comment:
   I've refactored methods of RocketMQMessageConverter to static. Before I created a new instance every time because I thought the JVM could allocate instance of this class on stack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on a diff in pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
orpiske commented on code in PR #8820:
URL: https://github.com/apache/camel/pull/8820#discussion_r1038931474


##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/RocketMQReplyManagerSupport.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.component.rocketmq.RocketMQEndpoint;
+import org.apache.camel.component.rocketmq.RocketMQMessageConverter;
+import org.apache.camel.component.rocketmq.RocketMQProducer;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQReplyManagerSupport extends ServiceSupport implements ReplyManager {
+
+    private static final int CLOSE_TIMEOUT = 30 * 1000;
+
+    protected final Logger log = LoggerFactory.getLogger(RocketMQReplyManagerSupport.class);
+    protected final CamelContext camelContext;
+    protected final CountDownLatch replyToLatch = new CountDownLatch(1);
+    protected ScheduledExecutorService executorService;
+    protected RocketMQEndpoint endpoint;
+    protected String replyToTopic;
+    protected DefaultMQPushConsumer mqPushConsumer;
+    protected ReplyTimeoutMap timeoutMap;
+    private final RocketMQMessageConverter messageConverter = new RocketMQMessageConverter();
+
+    public RocketMQReplyManagerSupport(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(executorService, "executorService", this);
+        ObjectHelper.notNull(endpoint, "endpoint", this);
+
+        log.debug("Using timeout checker interval with {} millis", endpoint.getRequestTimeoutCheckerIntervalMillis());
+        timeoutMap = new ReplyTimeoutMap(executorService, endpoint.getRequestTimeoutCheckerIntervalMillis());
+        ServiceHelper.startService(timeoutMap);
+
+        mqPushConsumer = createConsumer();
+        mqPushConsumer.start();
+
+        log.debug("Using executor {}", executorService);
+    }
+
+    protected DefaultMQPushConsumer createConsumer() throws MQClientException {
+        setReplyToTopic(endpoint.getReplyToTopic());
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
+        consumer.setConsumerGroup(endpoint.getReplyToConsumerGroup());
+        consumer.setNamesrvAddr(endpoint.getNamesrvAddr());
+        consumer.subscribe(replyToTopic, "*");
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            MessageExt messageExt = msgs.get(0);
+            onMessage(messageExt);
+            log.trace("Consume message {}", messageExt);
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        return consumer;
+    }
+
+    public void onMessage(MessageExt messageExt) {
+        String messageKey = Arrays.stream(messageExt.getKeys().split(MessageConst.KEY_SEPARATOR))
+                .filter(s -> s.startsWith(RocketMQProducer.GENERATE_MESSAGE_KEY_PREFIX)).findFirst().orElse(null);
+        if (messageKey == null) {
+            log.warn("Ignoreing message with no messageKey: {}", messageExt);

Review Comment:
   Typo ... should be "Ignoring". 



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/AclUtils.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public final class AclUtils {
+
+    private AclUtils() {
+    }
+
+    public static RPCHook getAclRPCHook(String accessKey, String secretKey) {
+        if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {

Review Comment:
   Maybe replace this `StringUtils.isNotBlank` with Camel's `StringHelper.notBlank(String, String, String)` and prevent the NPE that could happen on ```RocketMQConsumer.startConsumer```? 



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyTimeoutMap.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.support.DefaultTimeoutMap;
+
+public class ReplyTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
+
+    public ReplyTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+        super(executor, requestMapPollTimeMillis);
+        addListener(this::listener);
+    }
+
+    private static long encode(long timeoutMillis) {
+        return timeoutMillis > 0 ? timeoutMillis : Integer.MAX_VALUE;
+    }
+
+    private void listener(Listener.Type type, String key, ReplyHandler handler) {
+        switch (type) {
+            case Put:
+                log.trace("Add messageKey: {}", key);
+                break;
+            case Remove:
+                log.trace("Remove messageKey: {}", key);
+                break;
+            case Evict:
+                try {
+                    handler.onTimeout(key);
+                } catch (Throwable e) {
+                    log.warn("Error processing onTimeout for messageKey: " + key + " due: " + e.getLocalizedMessage()
+                             + ". This exception is ignored.",
+                            e);

Review Comment:
   Please, can you use log markers here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on a diff in pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
orpiske commented on code in PR #8820:
URL: https://github.com/apache/camel/pull/8820#discussion_r1041240275


##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyTimeoutMap.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.support.DefaultTimeoutMap;
+
+public class ReplyTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
+
+    public ReplyTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+        super(executor, requestMapPollTimeMillis);
+        addListener(this::listener);
+    }
+
+    private static long encode(long timeoutMillis) {
+        return timeoutMillis > 0 ? timeoutMillis : Integer.MAX_VALUE;
+    }
+
+    private void listener(Listener.Type type, String key, ReplyHandler handler) {
+        switch (type) {
+            case Put:
+                log.trace("Add messageKey: {}", key);
+                break;
+            case Remove:
+                log.trace("Remove messageKey: {}", key);
+                break;
+            case Evict:
+                try {
+                    handler.onTimeout(key);
+                } catch (Throwable e) {
+                    log.warn("Error processing onTimeout for messageKey: " + key + " due: " + e.getLocalizedMessage()
+                             + ". This exception is ignored.",
+                            e);

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] TeslaCN commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1335175946

   > The header keys are using dots `rocket.xxx` is there a specific reason for that? The standard for Camel components is to not use dots, and naming style
   > 
   > "CamelComponentNameXXX" eg CamelRockerMQSomeKeyName
   > 
   > See for example camel-jms and its keys.
   
   Thanks for your suggestion. I've renamed the header keys.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1333924185

   :star2: Thank you for your contribution to the Apache Camel project! :star2: 
   
   :warning: Please note that the changes on this PR may be **tested automatically**. 
   
   If necessary Apache Camel Committers may access logs and test results in the job summaries!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] TeslaCN commented on a diff in pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on code in PR #8820:
URL: https://github.com/apache/camel/pull/8820#discussion_r1038986297


##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConstants.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.spi.Metadata;
+
+public final class RocketMQConstants {

Review Comment:
   I've added labels to annotations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
orpiske commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1337134653

   > ### Components tested:
   > Total 	Tested 	Failed x 	Passed white_check_mark
   > 2 	2 	1 	0
   
   Currently failing due to a checkstyle issue: 
   
   ```
   [camel-rocketmq] [INFO] Starting audit...
   [ERROR] /home/runner/work/camel/camel/components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQMessageConverter.java:23:1: Utility classes should not have a public or default constructor. [HideUtilityClassConstructor]
   Audit done.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on a diff in pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
orpiske commented on code in PR #8820:
URL: https://github.com/apache/camel/pull/8820#discussion_r1039670321


##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyTimeoutMap.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.support.DefaultTimeoutMap;
+
+public class ReplyTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
+
+    public ReplyTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+        super(executor, requestMapPollTimeMillis);
+        addListener(this::listener);
+    }
+
+    private static long encode(long timeoutMillis) {
+        return timeoutMillis > 0 ? timeoutMillis : Integer.MAX_VALUE;
+    }
+
+    private void listener(Listener.Type type, String key, ReplyHandler handler) {
+        switch (type) {
+            case Put:
+                log.trace("Add messageKey: {}", key);
+                break;
+            case Remove:
+                log.trace("Remove messageKey: {}", key);
+                break;
+            case Evict:
+                try {
+                    handler.onTimeout(key);
+                } catch (Throwable e) {
+                    log.warn("Error processing onTimeout for messageKey: " + key + " due: " + e.getLocalizedMessage()
+                             + ". This exception is ignored.",
+                            e);

Review Comment:
   I mean something like: 
   
   ```
   log.warn("Error processing onTimeout for messageKey: {} due: {}. This exception is ignored.", key, e.getLocalizedMessage(), e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] TeslaCN commented on a diff in pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on code in PR #8820:
URL: https://github.com/apache/camel/pull/8820#discussion_r1039712499


##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/reply/ReplyTimeoutMap.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.support.DefaultTimeoutMap;
+
+public class ReplyTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
+
+    public ReplyTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+        super(executor, requestMapPollTimeMillis);
+        addListener(this::listener);
+    }
+
+    private static long encode(long timeoutMillis) {
+        return timeoutMillis > 0 ? timeoutMillis : Integer.MAX_VALUE;
+    }
+
+    private void listener(Listener.Type type, String key, ReplyHandler handler) {
+        switch (type) {
+            case Put:
+                log.trace("Add messageKey: {}", key);
+                break;
+            case Remove:
+                log.trace("Remove messageKey: {}", key);
+                break;
+            case Evict:
+                try {
+                    handler.onTimeout(key);
+                } catch (Throwable e) {
+                    log.warn("Error processing onTimeout for messageKey: " + key + " due: " + e.getLocalizedMessage()
+                             + ". This exception is ignored.",
+                            e);

Review Comment:
   Sorry that I misunderstood. I've updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1340482720

   ### Components tested:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 2 | 2 | 0 | 1 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] essobedo commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
essobedo commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1335264597

   This failure is not related to your changes, it happens already in the main branch. Let me see if I can fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] TeslaCN commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
TeslaCN commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1334693293

   It seems that the checkstyle failure was not caused by this PR.
   
   ![image](https://user-images.githubusercontent.com/20503072/205202955-0ff23b21-78ff-40b4-a1e1-3d63eb14a29c.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] davsclaus commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
davsclaus commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1335150345

   The header keys are using dots `rocket.xxx` is there a specific reason for that?
   The standard for Camel components is to not use dots, and naming style
   
   "CamelComponentNameXXX" eg CamelRockerMQSomeKeyName
   
   See for example camel-jms and its keys.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #8820:
URL: https://github.com/apache/camel/pull/8820#issuecomment-1335489267

   ### Components tested:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 2 | 2 | 0 | 1 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] davsclaus commented on a diff in pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
davsclaus commented on code in PR #8820:
URL: https://github.com/apache/camel/pull/8820#discussion_r1038750577


##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.util.Map;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+
+@Component("rocketmq")
+public class RocketMQComponent extends DefaultComponent {
+
+    @Metadata(label = "producer")
+    private String producerGroup;
+
+    @Metadata(label = "consumer")
+    private String consumerGroup;
+
+    @Metadata(label = "consumer", defaultValue = "*")
+    private String subscribeTags = "*";
+
+    @Metadata(label = "common")
+    private String sendTag = "";
+
+    @Metadata(label = "common", defaultValue = "localhost:9876")
+    private String namesrvAddr = "localhost:9876";
+
+    @Metadata(label = "producer")
+    private String replyToTopic;
+
+    @Metadata(label = "producer")
+    private String replyToConsumerGroup;
+
+    @Metadata(label = "advance", defaultValue = "10000")

Review Comment:
   advanced



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.util.Map;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+
+@Component("rocketmq")
+public class RocketMQComponent extends DefaultComponent {
+
+    @Metadata(label = "producer")
+    private String producerGroup;
+
+    @Metadata(label = "consumer")
+    private String consumerGroup;
+
+    @Metadata(label = "consumer", defaultValue = "*")
+    private String subscribeTags = "*";
+
+    @Metadata(label = "common")
+    private String sendTag = "";
+
+    @Metadata(label = "common", defaultValue = "localhost:9876")
+    private String namesrvAddr = "localhost:9876";
+
+    @Metadata(label = "producer")
+    private String replyToTopic;
+
+    @Metadata(label = "producer")
+    private String replyToConsumerGroup;
+
+    @Metadata(label = "advance", defaultValue = "10000")
+    private long requestTimeoutMillis = 10000L;
+
+    @Metadata(label = "advance", defaultValue = "1000")
+    private long requestTimeoutCheckerIntervalMillis = 1000L;
+
+    @Metadata(label = "producer", defaultValue = "false")
+    private boolean waitForSendResult;
+
+    @Metadata(label = "accessKey")

Review Comment:
   label = secret, secure = true



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.AsyncEndpoint;
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultMessage;
+
+/**
+ * Send and receive messages from <a href="https://rocketmq.apache.org/">RocketMQ</a> cluster.
+ */
+@UriEndpoint(firstVersion = "3.20.0", scheme = "rocketmq", syntax = "rocketmq:topicName", title = "RocketMQ",
+             category = Category.MESSAGING, headersClass = RocketMQConstants.class)
+public class RocketMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
+
+    @UriPath
+    @Metadata(required = true)
+    private String topicName;
+    @UriParam(label = "producer")
+    private String producerGroup;
+    @UriParam(label = "consumer")
+    private String consumerGroup;
+    @UriParam(label = "consumer", defaultValue = "*")
+    private String subscribeTags = "*";
+    @UriParam(label = "producer")
+    private String sendTag = "";
+    @UriParam(label = "producer")
+    private String replyToTopic;
+    @UriParam(label = "producer")
+    private String replyToConsumerGroup;
+    @UriParam(label = "common", defaultValue = "localhost:9876")
+    private String namesrvAddr = "localhost:9876";
+    @UriParam(label = "advance", defaultValue = "10000")
+    private long requestTimeoutMillis = 10000L;
+    @UriParam(label = "advance", defaultValue = "1000")
+    private long requestTimeoutCheckerIntervalMillis = 1000L;
+    @UriParam(label = "producer", defaultValue = "false")
+    private boolean waitForSendResult;
+    @UriParam(label = "accessKey")

Review Comment:
   label = security, secret = true



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Suspendable;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQConsumer extends DefaultConsumer implements Suspendable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQConsumer.class);
+
+    private final RocketMQEndpoint endpoint;
+
+    private DefaultMQPushConsumer mqPushConsumer;
+
+    public RocketMQConsumer(RocketMQEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    private void startConsumer() throws MQClientException {
+        if (mqPushConsumer != null) {
+            LOG.warn("Overriding RocketMQ Consumer! {}", mqPushConsumer);
+        }
+        mqPushConsumer = new DefaultMQPushConsumer(
+                null, endpoint.getConsumerGroup(),
+                AclUtils.getAclRPCHook(getEndpoint().getAccessKey(), getEndpoint().getSecretKey()));
+        mqPushConsumer.setNamesrvAddr(endpoint.getNamesrvAddr());
+        mqPushConsumer.subscribe(endpoint.getTopicName(), endpoint.getSubscribeTags());
+        mqPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            MessageExt messageExt = msgs.get(0);
+            Exchange exchange = endpoint.createRocketExchange(messageExt.getBody());
+            new RocketMQMessageConverter().setExchangeHeadersByMessageExt(exchange, messageExt);
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                LOG.error(e.getLocalizedMessage());

Review Comment:
   The consumer has exception handler to deal with this instead of hardcoded LOG
   Something ala: getExceptionHandler().handle
   
   



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.AsyncEndpoint;
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultMessage;
+
+/**
+ * Send and receive messages from <a href="https://rocketmq.apache.org/">RocketMQ</a> cluster.
+ */
+@UriEndpoint(firstVersion = "3.20.0", scheme = "rocketmq", syntax = "rocketmq:topicName", title = "RocketMQ",
+             category = Category.MESSAGING, headersClass = RocketMQConstants.class)
+public class RocketMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
+
+    @UriPath
+    @Metadata(required = true)
+    private String topicName;
+    @UriParam(label = "producer")
+    private String producerGroup;
+    @UriParam(label = "consumer")
+    private String consumerGroup;
+    @UriParam(label = "consumer", defaultValue = "*")
+    private String subscribeTags = "*";
+    @UriParam(label = "producer")
+    private String sendTag = "";
+    @UriParam(label = "producer")
+    private String replyToTopic;
+    @UriParam(label = "producer")
+    private String replyToConsumerGroup;
+    @UriParam(label = "common", defaultValue = "localhost:9876")
+    private String namesrvAddr = "localhost:9876";
+    @UriParam(label = "advance", defaultValue = "10000")
+    private long requestTimeoutMillis = 10000L;
+    @UriParam(label = "advance", defaultValue = "1000")
+    private long requestTimeoutCheckerIntervalMillis = 1000L;
+    @UriParam(label = "producer", defaultValue = "false")
+    private boolean waitForSendResult;
+    @UriParam(label = "accessKey")
+    private String accessKey;
+    @UriParam(label = "secretKey")
+    private String secretKey;
+
+    public RocketMQEndpoint() {
+    }
+
+    public RocketMQEndpoint(String endpointUri, RocketMQComponent component) {
+        super(endpointUri, component);
+    }
+
+    @Override
+    public Producer createProducer() {
+        return new RocketMQProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        RocketMQConsumer consumer = new RocketMQConsumer(this, processor);
+        configureConsumer(consumer);
+        return consumer;
+    }
+
+    @Override
+    public boolean isSingleton() {

Review Comment:
   This is default so can be removed



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.util.Map;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+
+@Component("rocketmq")
+public class RocketMQComponent extends DefaultComponent {
+
+    @Metadata(label = "producer")
+    private String producerGroup;
+
+    @Metadata(label = "consumer")
+    private String consumerGroup;
+
+    @Metadata(label = "consumer", defaultValue = "*")
+    private String subscribeTags = "*";
+
+    @Metadata(label = "common")
+    private String sendTag = "";
+
+    @Metadata(label = "common", defaultValue = "localhost:9876")
+    private String namesrvAddr = "localhost:9876";
+
+    @Metadata(label = "producer")
+    private String replyToTopic;
+
+    @Metadata(label = "producer")
+    private String replyToConsumerGroup;
+
+    @Metadata(label = "advance", defaultValue = "10000")
+    private long requestTimeoutMillis = 10000L;
+
+    @Metadata(label = "advance", defaultValue = "1000")
+    private long requestTimeoutCheckerIntervalMillis = 1000L;
+
+    @Metadata(label = "producer", defaultValue = "false")
+    private boolean waitForSendResult;
+
+    @Metadata(label = "accessKey")
+    private String accessKey;
+
+    @Metadata(label = "secretKey")

Review Comment:
   label = secret, secure = true



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQComponent.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.util.Map;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+
+@Component("rocketmq")
+public class RocketMQComponent extends DefaultComponent {
+
+    @Metadata(label = "producer")
+    private String producerGroup;
+
+    @Metadata(label = "consumer")
+    private String consumerGroup;
+
+    @Metadata(label = "consumer", defaultValue = "*")
+    private String subscribeTags = "*";
+
+    @Metadata(label = "common")
+    private String sendTag = "";
+
+    @Metadata(label = "common", defaultValue = "localhost:9876")
+    private String namesrvAddr = "localhost:9876";
+
+    @Metadata(label = "producer")
+    private String replyToTopic;
+
+    @Metadata(label = "producer")
+    private String replyToConsumerGroup;
+
+    @Metadata(label = "advance", defaultValue = "10000")
+    private long requestTimeoutMillis = 10000L;
+
+    @Metadata(label = "advance", defaultValue = "1000")

Review Comment:
   advanced



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConstants.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.spi.Metadata;
+
+public final class RocketMQConstants {

Review Comment:
   Are some of these headers ONLY used on consumer or producer, then we need to mark this as well. @essobedo can help with this



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Suspendable;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQConsumer extends DefaultConsumer implements Suspendable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQConsumer.class);
+
+    private final RocketMQEndpoint endpoint;
+
+    private DefaultMQPushConsumer mqPushConsumer;
+
+    public RocketMQConsumer(RocketMQEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    private void startConsumer() throws MQClientException {
+        if (mqPushConsumer != null) {
+            LOG.warn("Overriding RocketMQ Consumer! {}", mqPushConsumer);
+        }
+        mqPushConsumer = new DefaultMQPushConsumer(
+                null, endpoint.getConsumerGroup(),
+                AclUtils.getAclRPCHook(getEndpoint().getAccessKey(), getEndpoint().getSecretKey()));
+        mqPushConsumer.setNamesrvAddr(endpoint.getNamesrvAddr());
+        mqPushConsumer.subscribe(endpoint.getTopicName(), endpoint.getSubscribeTags());
+        mqPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            MessageExt messageExt = msgs.get(0);
+            Exchange exchange = endpoint.createRocketExchange(messageExt.getBody());
+            new RocketMQMessageConverter().setExchangeHeadersByMessageExt(exchange, messageExt);

Review Comment:
   Do you really need to create a new instance per message of this converter?



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQConsumer.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Suspendable;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQConsumer extends DefaultConsumer implements Suspendable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQConsumer.class);
+
+    private final RocketMQEndpoint endpoint;
+
+    private DefaultMQPushConsumer mqPushConsumer;
+
+    public RocketMQConsumer(RocketMQEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    private void startConsumer() throws MQClientException {
+        if (mqPushConsumer != null) {
+            LOG.warn("Overriding RocketMQ Consumer! {}", mqPushConsumer);

Review Comment:
   When will this happen? eg the state of a consumer should that it handles start/stop correctly



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.AsyncEndpoint;
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultMessage;
+
+/**
+ * Send and receive messages from <a href="https://rocketmq.apache.org/">RocketMQ</a> cluster.
+ */
+@UriEndpoint(firstVersion = "3.20.0", scheme = "rocketmq", syntax = "rocketmq:topicName", title = "RocketMQ",
+             category = Category.MESSAGING, headersClass = RocketMQConstants.class)
+public class RocketMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
+
+    @UriPath
+    @Metadata(required = true)
+    private String topicName;
+    @UriParam(label = "producer")
+    private String producerGroup;
+    @UriParam(label = "consumer")
+    private String consumerGroup;
+    @UriParam(label = "consumer", defaultValue = "*")
+    private String subscribeTags = "*";
+    @UriParam(label = "producer")
+    private String sendTag = "";
+    @UriParam(label = "producer")
+    private String replyToTopic;
+    @UriParam(label = "producer")
+    private String replyToConsumerGroup;
+    @UriParam(label = "common", defaultValue = "localhost:9876")
+    private String namesrvAddr = "localhost:9876";
+    @UriParam(label = "advance", defaultValue = "10000")
+    private long requestTimeoutMillis = 10000L;
+    @UriParam(label = "advance", defaultValue = "1000")

Review Comment:
   advanced



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQEndpoint.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import org.apache.camel.AsyncEndpoint;
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultMessage;
+
+/**
+ * Send and receive messages from <a href="https://rocketmq.apache.org/">RocketMQ</a> cluster.
+ */
+@UriEndpoint(firstVersion = "3.20.0", scheme = "rocketmq", syntax = "rocketmq:topicName", title = "RocketMQ",
+             category = Category.MESSAGING, headersClass = RocketMQConstants.class)
+public class RocketMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
+
+    @UriPath
+    @Metadata(required = true)
+    private String topicName;
+    @UriParam(label = "producer")
+    private String producerGroup;
+    @UriParam(label = "consumer")
+    private String consumerGroup;
+    @UriParam(label = "consumer", defaultValue = "*")
+    private String subscribeTags = "*";
+    @UriParam(label = "producer")
+    private String sendTag = "";
+    @UriParam(label = "producer")
+    private String replyToTopic;
+    @UriParam(label = "producer")
+    private String replyToConsumerGroup;
+    @UriParam(label = "common", defaultValue = "localhost:9876")
+    private String namesrvAddr = "localhost:9876";
+    @UriParam(label = "advance", defaultValue = "10000")
+    private long requestTimeoutMillis = 10000L;
+    @UriParam(label = "advance", defaultValue = "1000")
+    private long requestTimeoutCheckerIntervalMillis = 1000L;
+    @UriParam(label = "producer", defaultValue = "false")
+    private boolean waitForSendResult;
+    @UriParam(label = "accessKey")
+    private String accessKey;
+    @UriParam(label = "secretKey")

Review Comment:
   label = security, secret = true



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.rocketmq.reply.ReplyManager;
+import org.apache.camel.component.rocketmq.reply.RocketMQReplyManagerSupport;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQProducer extends DefaultAsyncProducer {
+
+    public static final String GENERATE_MESSAGE_KEY_PREFIX = "camel-rocketmq-";
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQProducer.class);
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private DefaultMQProducer mqProducer;
+
+    private ReplyManager replyManager;
+
+    public RocketMQProducer(RocketMQEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public RocketMQEndpoint getEndpoint() {
+        return (RocketMQEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (!isRunAllowed()) {
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException());
+            }
+            callback.done(true);
+            return true;
+        }
+        try {
+            LOG.trace("Exchange Pattern {}", exchange.getPattern());
+            if (exchange.getPattern().isOutCapable()) {
+                return processInOut(exchange, callback);
+            } else {
+                return processInOnly(exchange, callback);
+            }
+        } catch (Throwable e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+    }
+
+    protected boolean processInOut(final Exchange exchange, final AsyncCallback callback)
+            throws RemotingException, MQClientException, InterruptedException, NoTypeConversionAvailableException {
+        org.apache.camel.Message in = exchange.getIn();
+        Message message = new Message();
+        message.setTopic(in.getHeader(RocketMQConstants.OVERRIDE_TOPIC_NAME, () -> getEndpoint().getTopicName(), String.class));
+        message.setTags(in.getHeader(RocketMQConstants.OVERRIDE_TAG, () -> getEndpoint().getSendTag(), String.class));
+        message.setBody(exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, exchange, in.getBody()));
+        message.setKeys(in.getHeader(RocketMQConstants.OVERRIDE_MESSAGE_KEY, "", String.class));
+        initReplyManager();
+        String generateKey = GENERATE_MESSAGE_KEY_PREFIX + getEndpoint().getCamelContext().getUuidGenerator().generateUuid();
+        message.setKeys(Arrays.asList(Optional.ofNullable(message.getKeys()).orElse(""), generateKey));
+        LOG.debug("RocketMQ Producer sending {}", message);
+        mqProducer.send(message, new SendCallback() {
+
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                if (!SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
+                    exchange.setException(new SendFailedException(sendResult.toString()));
+                    callback.done(false);

Review Comment:
   return;



##########
components/camel-rocketmq/src/main/java/org/apache/camel/component/rocketmq/RocketMQProducer.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rocketmq;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.rocketmq.reply.ReplyManager;
+import org.apache.camel.component.rocketmq.reply.RocketMQReplyManagerSupport;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQProducer extends DefaultAsyncProducer {
+
+    public static final String GENERATE_MESSAGE_KEY_PREFIX = "camel-rocketmq-";
+
+    private static final Logger LOG = LoggerFactory.getLogger(RocketMQProducer.class);
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private DefaultMQProducer mqProducer;
+
+    private ReplyManager replyManager;
+
+    public RocketMQProducer(RocketMQEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public RocketMQEndpoint getEndpoint() {
+        return (RocketMQEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (!isRunAllowed()) {
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException());
+            }
+            callback.done(true);
+            return true;
+        }
+        try {
+            LOG.trace("Exchange Pattern {}", exchange.getPattern());
+            if (exchange.getPattern().isOutCapable()) {
+                return processInOut(exchange, callback);
+            } else {
+                return processInOnly(exchange, callback);
+            }
+        } catch (Throwable e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+    }
+
+    protected boolean processInOut(final Exchange exchange, final AsyncCallback callback)
+            throws RemotingException, MQClientException, InterruptedException, NoTypeConversionAvailableException {
+        org.apache.camel.Message in = exchange.getIn();
+        Message message = new Message();
+        message.setTopic(in.getHeader(RocketMQConstants.OVERRIDE_TOPIC_NAME, () -> getEndpoint().getTopicName(), String.class));
+        message.setTags(in.getHeader(RocketMQConstants.OVERRIDE_TAG, () -> getEndpoint().getSendTag(), String.class));
+        message.setBody(exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, exchange, in.getBody()));
+        message.setKeys(in.getHeader(RocketMQConstants.OVERRIDE_MESSAGE_KEY, "", String.class));
+        initReplyManager();
+        String generateKey = GENERATE_MESSAGE_KEY_PREFIX + getEndpoint().getCamelContext().getUuidGenerator().generateUuid();
+        message.setKeys(Arrays.asList(Optional.ofNullable(message.getKeys()).orElse(""), generateKey));
+        LOG.debug("RocketMQ Producer sending {}", message);
+        mqProducer.send(message, new SendCallback() {
+
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                if (!SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
+                    exchange.setException(new SendFailedException(sendResult.toString()));
+                    callback.done(false);
+                }
+                if (replyManager == null) {
+                    LOG.warn("replyToTopic not set! Will not wait for reply.");
+                    callback.done(false);
+                    return;
+                }
+                replyManager.registerReply(replyManager, exchange, callback, generateKey,
+                        getEndpoint().getRequestTimeoutMillis());
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                replyManager.cancelMessageKey(generateKey);

Review Comment:
   maybe use try .. finally to ensure callback is called if replymanager for some strange reason throw exception as callback MUST be called



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] davsclaus merged pull request #8820: CAMEL-14831: Add camel-rocketmq component

Posted by GitBox <gi...@apache.org>.
davsclaus merged PR #8820:
URL: https://github.com/apache/camel/pull/8820


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org