You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/24 09:07:57 UTC

[incubator-eventmesh] branch protocol-amqp updated: 1. define interface of downstreamDispatchStrategy 2. define a random dispatch strategy

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/protocol-amqp by this push:
     new 302f4d0f 1. define interface of downstreamDispatchStrategy 2. define a random dispatch strategy
     new 1916f3a3 Merge pull request #1799 from jackyluo-learning/protocol-amqp-shadow5
302f4d0f is described below

commit 302f4d0f13e4c82f2fc8dbb0604ca8cf5119d3f9
Author: jackyluo <15...@qq.com>
AuthorDate: Mon Oct 24 11:53:56 2022 +0800

    1. define interface of downstreamDispatchStrategy
    2. define a random dispatch strategy
---
 .../DownstreamDispatchStrategy.java                | 12 +++++++
 .../downstreamstrategy/RandomDispatchStrategy.java | 41 ++++++++++++++++++++++
 2 files changed, 53 insertions(+)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/DownstreamDispatchStrategy.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/DownstreamDispatchStrategy.java
new file mode 100644
index 00000000..8754f682
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/DownstreamDispatchStrategy.java
@@ -0,0 +1,12 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.downstreamstrategy;
+
+import org.apache.eventmesh.runtime.core.protocol.amqp.consumer.AmqpConsumer;
+import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
+import org.apache.eventmesh.runtime.core.protocol.amqp.util.AmqpGlobalMapping;
+
+/**
+ * used to choose a channel that can consume message
+ */
+public interface DownstreamDispatchStrategy {
+    AmqpConsumer select(String topic, AmqpGlobalMapping amqpGlobalMapping);
+}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/RandomDispatchStrategy.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/RandomDispatchStrategy.java
new file mode 100644
index 00000000..b6c30db3
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/RandomDispatchStrategy.java
@@ -0,0 +1,41 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.downstreamstrategy;
+
+import org.apache.eventmesh.runtime.core.protocol.amqp.consumer.AmqpConsumer;
+import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
+import org.apache.eventmesh.runtime.core.protocol.amqp.util.AmqpGlobalMapping;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * randomly select channel
+ */
+public class RandomDispatchStrategy implements DownstreamDispatchStrategy {
+    @Override
+    public AmqpConsumer select(String topic, AmqpGlobalMapping amqpGlobalMapping) {
+        if (amqpGlobalMapping == null
+                || MapUtils.isEmpty(amqpGlobalMapping.getConnection2ChannelMap())
+                || MapUtils.isEmpty(amqpGlobalMapping.getQueue2ChannelMap())
+                || StringUtils.isBlank(topic)) {
+            return null;
+        }
+
+        ConcurrentHashMap<String, Set<AmqpConsumer>> queue2ChannelMap = amqpGlobalMapping.getQueue2ConsumerMap();
+        if (!queue2ChannelMap.containsKey(topic)) {
+            return null;
+        }
+
+        Set<AmqpConsumer> amqpConsumerSet = queue2ChannelMap.get(topic);
+        // TODO: 2022/10/19 Filter out invalid consumer
+
+        List<AmqpConsumer> amqpConsumerList = new ArrayList<>(amqpConsumerSet);
+        Collections.shuffle(amqpConsumerList);
+        return amqpConsumerList.get(0);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org