You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/06/21 12:37:18 UTC
[incubator-eventmesh] branch develop updated: [ISSUE #391] Optimize
interface design in eventmesh-connector-api (#392)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new d39ea29 [ISSUE #391] Optimize interface design in eventmesh-connector-api (#392)
d39ea29 is described below
commit d39ea29317eaa9a8d6201fff1ff26f6da5593151
Author: lrhkobe <34...@users.noreply.github.com>
AuthorDate: Mon Jun 21 20:37:11 2021 +0800
[ISSUE #391] Optimize interface design in eventmesh-connector-api (#392)
* modify:optimize flow control in downstreaming msg
* modify:optimize stategy of selecting session in downstream msg
* modify:optimize msg downstream,msg store in session
* modify:fix bug:not a @Sharable handler
* modify:downstream broadcast msg asynchronously
* modify:remove unneccessary interface in eventmesh-connector-api
* modify:fix conflict
* modify:add license in EventMeshAction
close #391
---
...yncConsumeContext.java => EventMeshAction.java} | 15 ++-----
...text.java => EventMeshAsyncConsumeContext.java} | 24 +++++++----
.../eventmesh/api/consumer/MeshMQPushConsumer.java | 5 ---
.../eventmesh/api/producer/MeshMQProducer.java | 9 +---
.../rocketmq/consumer/PushConsumerImpl.java | 49 +++++++++++++++-------
.../rocketmq/producer/RocketMQProducerImpl.java | 17 +-------
.../rocketmq/consumer/PushConsumerImplTest.java | 4 +-
.../runtime/core/plugin/MQConsumerWrapper.java | 8 ----
.../protocol/http/consumer/EventMeshConsumer.java | 22 +++++-----
.../http/processor/SendSyncMessageProcessor.java | 2 +-
.../tcp/client/group/ClientGroupWrapper.java | 18 ++++----
.../core/protocol/tcp/client/session/Session.java | 3 +-
.../tcp/client/task/MessageTransferTask.java | 5 ---
13 files changed, 84 insertions(+), 97 deletions(-)
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
similarity index 71%
copy from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java
copy to eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
index d6b2aa4..4fda6d0 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java
@@ -14,19 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.eventmesh.api;
-import io.openmessaging.api.AsyncConsumeContext;
-
-public abstract class MeshAsyncConsumeContext extends AsyncConsumeContext {
- private AbstractContext context;
+public enum EventMeshAction {
+ CommitMessage,
- public AbstractContext getContext() {
- return context;
- }
+ ReconsumeLater,
- public void setContext(AbstractContext context) {
- this.context = context;
- }
+ ManualAck
}
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
similarity index 62%
rename from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java
rename to eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
index d6b2aa4..c7e4e7f 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/MeshAsyncConsumeContext.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java
@@ -14,19 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.eventmesh.api;
+import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;
-public abstract class MeshAsyncConsumeContext extends AsyncConsumeContext {
- private AbstractContext context;
+public abstract class EventMeshAsyncConsumeContext extends AsyncConsumeContext {
+
+ private AbstractContext abstractContext;
- public AbstractContext getContext() {
- return context;
+ public AbstractContext getAbstractContext() {
+ return abstractContext;
}
- public void setContext(AbstractContext context) {
- this.context = context;
+ public void setAbstractContext(AbstractContext abstractContext) {
+ this.abstractContext = abstractContext;
+ }
+
+ public abstract void commit(EventMeshAction action);
+
+ @Override
+ public void commit(Action action) {
+ throw new UnsupportedOperationException("not support yet");
}
-}
+}
\ No newline at end of file
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
index 172a674..5e60e0e 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
@@ -30,11 +30,6 @@ public interface MeshMQPushConsumer extends Consumer {
void init(Properties keyValue) throws Exception;
- @Override
- void start();
-
-// void updateOffset(List<MessageExt> msgs, ConsumeConcurrentlyContext context);
-
void updateOffset(List<Message> msgs, AbstractContext context);
// void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently);
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
index 929654f..82ca583 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
@@ -29,9 +29,6 @@ public interface MeshMQProducer extends Producer {
void init(Properties properties) throws Exception;
- @Override
- void start();
-
void send(Message message, SendCallback sendCallback) throws Exception;
void request(Message message, SendCallback sendCallback, RRCallback rrCallback, long timeout) throws Exception;
@@ -40,12 +37,8 @@ public interface MeshMQProducer extends Producer {
boolean reply(final Message message, final SendCallback sendCallback) throws Exception;
- MeshMQProducer getMeshMQProducer();
-
- String buildMQClientId();
+ void checkTopicExist(String topic) throws Exception;
void setExtFields();
- void getDefaultTopicRouteInfoFromNameServer(String topic, long timeout) throws Exception;
-
}
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
index bbdbd86..47565e3 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
+++ b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java
@@ -21,9 +21,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-
-import io.openmessaging.api.Action;
-import io.openmessaging.api.AsyncConsumeContext;
import io.openmessaging.api.AsyncGenericMessageListener;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Consumer;
@@ -32,9 +29,8 @@ import io.openmessaging.api.Message;
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.MessageSelector;
import io.openmessaging.api.exception.OMSRuntimeException;
-
-import org.apache.eventmesh.api.AbstractContext;
-import org.apache.eventmesh.api.MeshAsyncConsumeContext;
+import org.apache.eventmesh.api.EventMeshAction;
+import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;
import org.apache.eventmesh.connector.rocketmq.config.ClientConfig;
@@ -116,13 +112,25 @@ public class PushConsumerImpl implements Consumer {
final Properties contextProperties = new Properties();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
- MeshAsyncConsumeContext omsContext = new MeshAsyncConsumeContext() {
+ EventMeshAsyncConsumeContext omsContext = new EventMeshAsyncConsumeContext() {
@Override
- public void commit(Action action) {
- contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ public void commit(EventMeshAction action) {
+ switch (action){
+ case CommitMessage:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ break;
+ case ReconsumeLater:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+ break;
+ case ManualAck:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+ break;
+ default:
+ break;
+ }
}
};
- omsContext.setContext(context);
+ omsContext.setAbstractContext(context);
listener.consume(omsMsg, omsContext);
return EventMeshConsumeConcurrentlyStatus.valueOf(contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
@@ -156,14 +164,25 @@ public class PushConsumerImpl implements Consumer {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
- MeshAsyncConsumeContext omsContext = new MeshAsyncConsumeContext() {
+ EventMeshAsyncConsumeContext omsContext = new EventMeshAsyncConsumeContext() {
@Override
- public void commit(Action action) {
- contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
- EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ public void commit(EventMeshAction action) {
+ switch (action) {
+ case CommitMessage:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ break;
+ case ReconsumeLater:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+ break;
+ case ManualAck:
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
+ break;
+ default:
+ break;
+ }
}
};
- omsContext.setContext(context);
+ omsContext.setAbstractContext(context);
listener.consume(omsMsg, omsContext);
return EventMeshConsumeConcurrentlyStatus.valueOf(contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
index 6364650..72e2093 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
+++ b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java
@@ -87,7 +87,6 @@ public class RocketMQProducerImpl implements MeshMQProducer {
producer.start();
}
-
@Override
public synchronized void shutdown() {
producer.shutdown();
@@ -115,13 +114,8 @@ public class RocketMQProducerImpl implements MeshMQProducer {
}
@Override
- public MeshMQProducer getMeshMQProducer() {
- return this;
- }
-
- @Override
- public String buildMQClientId() {
- return producer.getRocketmqProducer().buildMQClientId();
+ public void checkTopicExist(String topic) throws Exception {
+ this.producer.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getDefaultTopicRouteInfoFromNameServer(topic, EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}
@Override
@@ -130,13 +124,6 @@ public class RocketMQProducerImpl implements MeshMQProducer {
}
@Override
- public void getDefaultTopicRouteInfoFromNameServer(String topic, long timeout) throws Exception {
- producer.getRocketmqProducer().getDefaultMQProducerImpl()
- .getmQClientFactory().getMQClientAPIImpl().getDefaultTopicRouteInfoFromNameServer(topic,
- timeout);
- }
-
- @Override
public SendResult send(Message message) {
return producer.send(message);
}
diff --git a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java b/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
index 186ef4a..1140535 100644
--- a/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
+++ b/eventmesh-connector-rocketmq/src/test/java/org/apache/rocketmq/consumer/PushConsumerImplTest.java
@@ -32,6 +32,8 @@ import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
+import org.apache.eventmesh.api.EventMeshAction;
+import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.connector.rocketmq.consumer.PushConsumerImpl;
import org.apache.eventmesh.connector.rocketmq.domain.NonStandardKeys;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -95,7 +97,7 @@ public class PushConsumerImplTest {
public void consume(Message message, AsyncConsumeContext context) {
assertThat(message.getSystemProperties("MESSAGE_ID")).isEqualTo("NewMsgId");
assertThat(message.getBody()).isEqualTo(testBody);
- context.commit(Action.CommitMessage);
+ ((EventMeshAsyncConsumeContext)context).commit(EventMeshAction.CommitMessage);
}
});
((MessageListenerConcurrently) rocketmqPushConsumer
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
index 39b2c63..080b7af 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
@@ -43,14 +43,6 @@ public class MQConsumerWrapper extends MQWrapper {
meshMQPushConsumer.unsubscribe(topic);
}
-// public boolean isPause() {
-// return meshMQPushConsumer.isPause();
-// }
-//
-// public void pause() {
-// meshMQPushConsumer.pause();
-// }
-
public synchronized void init(Properties keyValue) throws Exception {
meshMQPushConsumer = getMeshMQPushConsumer();
if (meshMQPushConsumer == null) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 8a1049b..8620e68 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -21,7 +21,6 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
-import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Message;
@@ -31,7 +30,8 @@ import io.openmessaging.api.SendResult;
import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.api.AbstractContext;
-import org.apache.eventmesh.api.MeshAsyncConsumeContext;
+import org.apache.eventmesh.api.EventMeshAction;
+import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
@@ -125,6 +125,7 @@ public class EventMeshConsumer {
}
ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
if (currentTopicConfig == null) {
logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
@@ -132,18 +133,18 @@ public class EventMeshConsumer {
sendMessageBack(message, uniqueId, bizSeqNo);
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
- context.commit(Action.CommitMessage);
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
- topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+ topic, message, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
// context.ack();
- context.commit(Action.CommitMessage);
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(message, uniqueId, bizSeqNo);
@@ -152,7 +153,7 @@ public class EventMeshConsumer {
}
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
- context.commit(Action.CommitMessage);
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
}
};
@@ -174,6 +175,7 @@ public class EventMeshConsumer {
}
ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
if (currentTopicConfig == null) {
logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
@@ -181,18 +183,18 @@ public class EventMeshConsumer {
sendMessageBack(message, uniqueId, bizSeqNo);
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
- context.commit(Action.CommitMessage);
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
- topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+ topic, message, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
// context.ack();
- context.commit(Action.CommitMessage);
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(message, uniqueId, bizSeqNo);
@@ -201,7 +203,7 @@ public class EventMeshConsumer {
}
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
- context.commit(Action.CommitMessage);
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
}
};
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 5ee12d3..5511e38 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -138,7 +138,7 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
omsMsg.putUserProperties("msgType", "persistent");
omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
omsMsg.putUserProperties(Constants.RMB_UNIQ_ID, sendMessageRequestBody.getUniqueId());
- omsMsg.putUserProperties("REPLY_TO", eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
+// omsMsg.putUserProperties("REPLY_TO", eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
if (messageLogger.isDebugEnabled()) {
messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", sendMessageRequestBody.getBizSeqNo(),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index a297935..bbd62c0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -30,7 +30,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.alibaba.fastjson.JSON;
-import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Message;
@@ -40,7 +39,8 @@ import io.openmessaging.api.SendResult;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.api.MeshAsyncConsumeContext;
+import org.apache.eventmesh.api.EventMeshAction;
+import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
@@ -536,18 +536,19 @@ public class ClientGroupWrapper {
message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
if (CollectionUtils.isEmpty(groupConsumerSessions)) {
logger.warn("found no session to downstream broadcast msg");
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
- context.commit(Action.CommitMessage);
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
}
Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
DownStreamMsgContext downStreamMsgContext =
- new DownStreamMsgContext(message, null, broadCastMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false, subscriptionItem);
+ new DownStreamMsgContext(message, null, broadCastMsgConsumer, eventMeshAsyncConsumeContext.getAbstractContext(), false, subscriptionItem);
while (sessionsItr.hasNext()) {
Session session = sessionsItr.next();
@@ -572,7 +573,7 @@ public class ClientGroupWrapper {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
// context.ack();
- context.commit(Action.CommitMessage);
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
}
};
broadCastMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
@@ -585,6 +586,7 @@ public class ClientGroupWrapper {
message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
Session session = downstreamDispatchStrategy.select(consumerGroup, topic, groupConsumerSessions);
String bizSeqNo = EventMeshUtil.getMessageBizSeq(message);
if (session == null) {
@@ -614,18 +616,18 @@ public class ClientGroupWrapper {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
- context.commit(Action.CommitMessage);
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
}
DownStreamMsgContext downStreamMsgContext =
- new DownStreamMsgContext(message, session, persistentMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false, subscriptionItem);
+ new DownStreamMsgContext(message, session, persistentMsgConsumer, eventMeshAsyncConsumeContext.getAbstractContext(), false, subscriptionItem);
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
// context.ack();
- context.commit(Action.CommitMessage);
+ eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
}
};
persistentMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index 8f4e72a..280ff3b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -166,8 +166,7 @@ public class Session {
sessionContext.subscribeTopics.putIfAbsent(item.getTopic(), item);
clientGroupWrapper.get().subscribe(item);
- clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().getDefaultTopicRouteInfoFromNameServer(item.getTopic(),
- EventMeshConstants.DEFAULT_TIME_OUT_MILLS);
+ clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().checkTopicExist(item.getTopic());
clientGroupWrapper.get().addSubscription(item.getTopic(), this);
subscribeLogger.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index 2d4b80d..028214d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -84,11 +84,6 @@ public class MessageTransferTask extends AbstractTask {
synchronized (session) {
long sendTime = System.currentTimeMillis();
addTimestamp(eventMeshMessage, cmd, sendTime);
- if (cmd.equals(Command.REQUEST_TO_SERVER)) {
- //Message Attach SYNC
- eventMeshMessage.getProperties().put(EventMeshConstants.PROPERTY_MESSAGE_REPLY_TO, session.getClientGroupWrapper()
- .get().getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
- }
sendStatus = session.upstreamMsg(pkg.getHeader(), EventMeshUtil.decodeMessage(eventMeshMessage), createSendCallback(replyCmd, taskExecuteTime, eventMeshMessage), startTime, taskExecuteTime);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org