You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/24 09:07:57 UTC
[incubator-eventmesh] branch protocol-amqp updated: 1. define interface of downstreamDispatchStrategy 2. define a random dispatch strategy
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/protocol-amqp by this push:
new 302f4d0f 1. define interface of downstreamDispatchStrategy 2. define a random dispatch strategy
new 1916f3a3 Merge pull request #1799 from jackyluo-learning/protocol-amqp-shadow5
302f4d0f is described below
commit 302f4d0f13e4c82f2fc8dbb0604ca8cf5119d3f9
Author: jackyluo <15...@qq.com>
AuthorDate: Mon Oct 24 11:53:56 2022 +0800
1. define interface of downstreamDispatchStrategy
2. define a random dispatch strategy
---
.../DownstreamDispatchStrategy.java | 12 +++++++
.../downstreamstrategy/RandomDispatchStrategy.java | 41 ++++++++++++++++++++++
2 files changed, 53 insertions(+)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/DownstreamDispatchStrategy.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/DownstreamDispatchStrategy.java
new file mode 100644
index 00000000..8754f682
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/DownstreamDispatchStrategy.java
@@ -0,0 +1,12 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.downstreamstrategy;
+
+import org.apache.eventmesh.runtime.core.protocol.amqp.consumer.AmqpConsumer;
+import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
+import org.apache.eventmesh.runtime.core.protocol.amqp.util.AmqpGlobalMapping;
+
+/**
+ * used to choose a channel that can consume message
+ */
+public interface DownstreamDispatchStrategy {
+ AmqpConsumer select(String topic, AmqpGlobalMapping amqpGlobalMapping);
+}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/RandomDispatchStrategy.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/RandomDispatchStrategy.java
new file mode 100644
index 00000000..b6c30db3
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/RandomDispatchStrategy.java
@@ -0,0 +1,41 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.downstreamstrategy;
+
+import org.apache.eventmesh.runtime.core.protocol.amqp.consumer.AmqpConsumer;
+import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
+import org.apache.eventmesh.runtime.core.protocol.amqp.util.AmqpGlobalMapping;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * randomly select channel
+ */
+public class RandomDispatchStrategy implements DownstreamDispatchStrategy {
+ @Override
+ public AmqpConsumer select(String topic, AmqpGlobalMapping amqpGlobalMapping) {
+ if (amqpGlobalMapping == null
+ || MapUtils.isEmpty(amqpGlobalMapping.getConnection2ChannelMap())
+ || MapUtils.isEmpty(amqpGlobalMapping.getQueue2ChannelMap())
+ || StringUtils.isBlank(topic)) {
+ return null;
+ }
+
+ ConcurrentHashMap<String, Set<AmqpConsumer>> queue2ChannelMap = amqpGlobalMapping.getQueue2ConsumerMap();
+ if (!queue2ChannelMap.containsKey(topic)) {
+ return null;
+ }
+
+ Set<AmqpConsumer> amqpConsumerSet = queue2ChannelMap.get(topic);
+ // TODO: 2022/10/19 Filter out invalid consumer
+
+ List<AmqpConsumer> amqpConsumerList = new ArrayList<>(amqpConsumerSet);
+ Collections.shuffle(amqpConsumerList);
+ return amqpConsumerList.get(0);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org