You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/11/03 13:59:47 UTC
[incubator-eventmesh] branch master updated: fix CI error: rocketmq connector
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 7475e20d fix CI error: rocketmq connector
new ac2b56da Merge pull request #2105 from horoc/fix-ci-error-rocketmq
7475e20d is described below
commit 7475e20d459172b587b89051605fb214e052233a
Author: horoc <ho...@gmail.com>
AuthorDate: Thu Nov 3 20:01:07 2022 +0800
fix CI error: rocketmq connector
---
.../rocketmq/consumer/PushConsumerImpl.java | 48 +++++++++++-----------
1 file changed, 24 insertions(+), 24 deletions(-)
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
index 74d6c67d..8283628b 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
@@ -72,7 +72,7 @@ public class PushConsumerImpl {
String consumerGroup = clientConfig.getConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new ConnectorRuntimeException(
- "Consumer Group is necessary for RocketMQ, please set it.");
+ "Consumer Group is necessary for RocketMQ, please set it.");
}
this.rocketmqPushConsumer.setConsumerGroup(consumerGroup);
this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes());
@@ -80,7 +80,7 @@ public class PushConsumerImpl {
this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());
this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());
this.rocketmqPushConsumer.setMessageModel(
- MessageModel.valueOf(clientConfig.getMessageModel()));
+ MessageModel.valueOf(clientConfig.getMessageModel()));
String consumerId = OMSUtil.buildInstanceName();
//this.rocketmqPushConsumer.setInstanceName(consumerId);
@@ -149,16 +149,16 @@ public class PushConsumerImpl {
public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) {
ConsumeMessageService consumeMessageService = rocketmqPushConsumer
- .getDefaultMQPushConsumerImpl().getConsumeMessageService();
+ .getDefaultMQPushConsumerImpl().getConsumeMessageService();
List<MessageExt> msgExtList = new ArrayList<>(cloudEvents.size());
for (CloudEvent msg : cloudEvents) {
- if(msg != null) {
+ if (msg != null) {
msgExtList.add(CloudEventUtils.msgConvertExt(
- RocketMQMessageFactory.createWriter(msg.getSubject()).writeBinary(msg)));
+ RocketMQMessageFactory.createWriter(msg.getSubject()).writeBinary(msg)));
}
}
((ConsumeMessageConcurrentlyService) consumeMessageService)
- .updateOffset(msgExtList, (EventMeshConsumeConcurrentlyContext) context);
+ .updateOffset(msgExtList, (EventMeshConsumeConcurrentlyContext) context);
}
@@ -172,13 +172,13 @@ public class PushConsumerImpl {
}
msg.putUserProperty(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP,
- String.valueOf(msg.getBornTimestamp()));
+ String.valueOf(msg.getBornTimestamp()));
msg.putUserProperty(Constants.PROPERTY_MESSAGE_STORE_TIMESTAMP,
- String.valueOf(msg.getStoreTimestamp()));
+ String.valueOf(msg.getStoreTimestamp()));
//for rr request/reply
CloudEvent cloudEvent =
- RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();
+ RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();
CloudEventBuilder cloudEventBuilder = null;
for (String sysPropKey : MessageConst.STRING_HASH_SET) {
@@ -194,27 +194,27 @@ public class PushConsumerImpl {
if (eventListener == null) {
throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
- msg.getTopic()));
+ msg.getTopic()));
}
final Properties contextProperties = new Properties();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+ EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(EventMeshAction action) {
switch (action) {
case CommitMessage:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
break;
case ReconsumeLater:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+ EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
break;
case ManualAck:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+ EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
break;
default:
break;
@@ -227,7 +227,7 @@ public class PushConsumerImpl {
eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
return EventMeshConsumeConcurrentlyStatus.valueOf(
- contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+ contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
@@ -243,12 +243,12 @@ public class PushConsumerImpl {
}
msg.putUserProperty(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP,
- String.valueOf(msg.getBornTimestamp()));
+ String.valueOf(msg.getBornTimestamp()));
msg.putUserProperty(EventMeshConstants.STORE_TIMESTAMP,
- String.valueOf(msg.getStoreTimestamp()));
+ String.valueOf(msg.getStoreTimestamp()));
CloudEvent cloudEvent =
- RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();
+ RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent();
CloudEventBuilder cloudEventBuilder = null;
@@ -265,13 +265,13 @@ public class PushConsumerImpl {
if (eventListener == null) {
throw new ConnectorRuntimeException(String.format("The topic/queue %s isn't attached to this consumer",
- msg.getTopic()));
+ msg.getTopic()));
}
final Properties contextProperties = new Properties();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+ EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() {
@Override
@@ -279,15 +279,15 @@ public class PushConsumerImpl {
switch (action) {
case CommitMessage:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
break;
case ReconsumeLater:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+ EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
break;
case ManualAck:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+ EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
break;
default:
break;
@@ -300,7 +300,7 @@ public class PushConsumerImpl {
eventListener.consume(cloudEvent, eventMeshAsyncConsumeContext);
return EventMeshConsumeConcurrentlyStatus.valueOf(
- contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+ contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org