You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/11/03 13:59:47 UTC

[incubator-eventmesh] branch master updated: fix CI error: rocketmq connector

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 7475e20d fix CI error: rocketmq connector
     new ac2b56da Merge pull request #2105 from horoc/fix-ci-error-rocketmq
7475e20d is described below

commit 7475e20d459172b587b89051605fb214e052233a
Author: horoc <ho...@gmail.com>
AuthorDate: Thu Nov 3 20:01:07 2022 +0800

    fix CI error: rocketmq connector
---
 .../rocketmq/consumer/PushConsumerImpl.java        | 48 +++++++++++-----------
 1 file changed, 24 insertions(+), 24 deletions(-)

diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
index 74d6c67d..8283628b 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
@@ -72,7 +72,7 @@ public class PushConsumerImpl {
         String consumerGroup = clientConfig.getConsumerId();
         if (null == consumerGroup || consumerGroup.isEmpty()) {
             throw new ConnectorRuntimeException(
-                    "Consumer Group is necessary for RocketMQ, please set it.");
+                "Consumer Group is necessary for RocketMQ, please set it.");
         }
         this.rocketmqPushConsumer.setConsumerGroup(consumerGroup);
         this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes());
@@ -80,7 +80,7 @@ public class PushConsumerImpl {
         this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());
         this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());
         this.rocketmqPushConsumer.setMessageModel(
-                MessageModel.valueOf(clientConfig.getMessageModel()));
+            MessageModel.valueOf(clientConfig.getMessageModel()));
 
         String consumerId = OMSUtil.buildInstanceName();
         //this.rocketmqPushConsumer.setInstanceName(consumerId);
@@ -149,16 +149,16 @@ public class PushConsumerImpl {
 
     public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) {
         ConsumeMessageService consumeMessageService = rocketmqPushConsumer
-                .getDefaultMQPushConsumerImpl().getConsumeMessageService();
+            .getDefaultMQPushConsumerImpl().getConsumeMessageService();
         List<MessageExt> msgExtList = new ArrayList<>(cloudEvents.size());
         for (CloudEvent msg : cloudEvents) {
-            if(msg != null) {
+            if (msg != null) {
                 msgExtList.add(CloudEventUtils.msgConvertExt(
-                         RocketMQMessageFactory.createWriter(msg.getSubject()).writeBinary(msg)));
+                    RocketMQMessageFactory.createWriter(msg.getSubject()).writeBinary(msg)));
             }
         }
         ((ConsumeMessageConcurrentlyService) consumeMessageService)
-                .updateOffset(msgExtList, (EventMeshConsumeConcurrentlyContext) context);
+            .updateOffset(msgExtList, (EventMeshConsumeConcurrentlyContext) context);
     }
 
 
@@ -172,13 +172,13 @@ public class PushConsumerImpl {
             }
 
             msg.putUserProperty(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP,
-                    String.valueOf(msg.getBornTimestamp()));
+                String.valueOf(msg.getBornTimestamp()));
             msg.putUserProperty(Constants.PROPERTY_MESSAGE_STORE_TIMESTAMP,
-                    String.valueOf(msg.getStoreTimestamp()));
+                String.valueOf(msg.getStoreTimestamp()));
 
             //for rr request/reply
             CloudEvent cloudEvent =
-                    RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();
+                RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();
 
             CloudEventBuilder cloudEventBuilder = null;
             for (String sysPropKey : MessageConst.STRING_HASH_SET) {
@@ -194,27 +194,27 @@ public class PushConsumerImpl {
 
             if (eventListener == null) {
                 throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
-                        msg.getTopic()));
+                    msg.getTopic()));
             }
 
             final Properties contextProperties = new Properties();
             contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                    EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+                EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
             EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() {
                 @Override
                 public void commit(EventMeshAction action) {
                     switch (action) {
                         case CommitMessage:
                             contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                                    EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                                EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
                             break;
                         case ReconsumeLater:
                             contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                                    EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+                                EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
                             break;
                         case ManualAck:
                             contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                                    EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+                                EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
                             break;
                         default:
                             break;
@@ -227,7 +227,7 @@ public class PushConsumerImpl {
             eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
 
             return EventMeshConsumeConcurrentlyStatus.valueOf(
-                    contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+                contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
         }
 
 
@@ -243,12 +243,12 @@ public class PushConsumerImpl {
             }
 
             msg.putUserProperty(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP,
-                    String.valueOf(msg.getBornTimestamp()));
+                String.valueOf(msg.getBornTimestamp()));
             msg.putUserProperty(EventMeshConstants.STORE_TIMESTAMP,
-                    String.valueOf(msg.getStoreTimestamp()));
+                String.valueOf(msg.getStoreTimestamp()));
 
             CloudEvent cloudEvent =
-                    RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();
+                RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();
 
             CloudEventBuilder cloudEventBuilder = null;
 
@@ -265,13 +265,13 @@ public class PushConsumerImpl {
 
             if (eventListener == null) {
                 throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
-                        msg.getTopic()));
+                    msg.getTopic()));
             }
 
             final Properties contextProperties = new Properties();
 
             contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                    EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+                EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
 
             EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() {
                 @Override
@@ -279,15 +279,15 @@ public class PushConsumerImpl {
                     switch (action) {
                         case CommitMessage:
                             contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                                    EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                                EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
                             break;
                         case ReconsumeLater:
                             contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                                    EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+                                EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
                             break;
                         case ManualAck:
                             contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
-                                    EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+                                EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
                             break;
                         default:
                             break;
@@ -300,7 +300,7 @@ public class PushConsumerImpl {
             eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
 
             return EventMeshConsumeConcurrentlyStatus.valueOf(
-                    contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+                contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org