You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/11/06 08:05:32 UTC

[incubator-eventmesh] branch protocol-amqp updated: realization TopicParser

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/protocol-amqp by this push:
     new 009f3cce realization TopicParser
     new d3d06de3 Merge pull request #2124 from wangshaojie4039/protocol-amqp
009f3cce is described below

commit 009f3cce1598a9c7f9fa3ccf31192365abda7d1b
Author: wangshaojie <wa...@cmss.chinamobile.com>
AuthorDate: Sat Nov 5 14:58:54 2022 +0800

    realization TopicParser
---
 .../protocol/amqp/exchange/topic/TopicParser.java  | 356 +++++++++++++++++++++
 1 file changed, 356 insertions(+)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/exchange/topic/TopicParser.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/exchange/topic/TopicParser.java
new file mode 100644
index 00000000..77f8d32d
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/exchange/topic/TopicParser.java
@@ -0,0 +1,356 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.eventmesh.runtime.core.protocol.amqp.exchange.topic;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TopicParser {
+    private static final String TOPIC_DELIMITER = "\\.";
+
+    private final TopicWordDictionary _dictionary = new TopicWordDictionary();
+    private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<>();
+
+    private static class Position {
+        private final TopicWord _word;
+        private final boolean _selfTransition;
+        private final int _position;
+        private final boolean _endState;
+        private boolean _followedByAnyLoop;
+
+        private Position(final int position, final TopicWord word, final boolean selfTransition,
+            final boolean endState) {
+            _position = position;
+            _word = word;
+            _selfTransition = selfTransition;
+            _endState = endState;
+        }
+
+        private TopicWord getWord() {
+            return _word;
+        }
+
+        private boolean isSelfTransition() {
+            return _selfTransition;
+        }
+
+        private int getPosition() {
+            return _position;
+        }
+
+        private boolean isEndState() {
+            return _endState;
+        }
+
+        private boolean isFollowedByAnyLoop() {
+            return _followedByAnyLoop;
+        }
+
+        private void setFollowedByAnyLoop(boolean followedByAnyLoop) {
+            _followedByAnyLoop = followedByAnyLoop;
+        }
+    }
+
+    private static final Position ERROR_POSITION = new Position(Integer.MAX_VALUE, null, true, false);
+
+    private static class SimpleState {
+        private Set<Position> _positions;
+        private Map<TopicWord, SimpleState> _nextState;
+    }
+
+    public void addBinding(String bindingKey, TopicMatcherResult result) {
+
+        TopicMatcherDFAState startingStateMachine;
+        TopicMatcherDFAState newStateMachine;
+
+        do {
+            startingStateMachine = _stateMachine.get();
+            if (startingStateMachine == null) {
+                newStateMachine = createStateMachine(bindingKey, result);
+            } else {
+                newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result));
+            }
+
+        }
+        while (!_stateMachine.compareAndSet(startingStateMachine, newStateMachine));
+
+    }
+
+    public Collection<TopicMatcherResult> parse(String routingKey) {
+        TopicMatcherDFAState stateMachine = _stateMachine.get();
+        if (stateMachine == null) {
+            return Collections.emptySet();
+        } else {
+            return stateMachine.parse(_dictionary, routingKey);
+        }
+    }
+
+    private TopicMatcherDFAState createStateMachine(String bindingKey, TopicMatcherResult result) {
+        List<TopicWord> wordList = createTopicWordList(bindingKey);
+        int wildCards = 0;
+        for (TopicWord word : wordList) {
+            if (word == TopicWord.WILDCARD_WORD) {
+                wildCards++;
+            }
+        }
+        if (wildCards == 0) {
+            TopicMatcherDFAState[] states = new TopicMatcherDFAState[wordList.size() + 1];
+            states[states.length - 1] = new TopicMatcherDFAState(Collections.emptyMap(), Collections.singleton(result));
+            for (int i = states.length - 2; i >= 0; i--) {
+                states[i] = new TopicMatcherDFAState(Collections.singletonMap(wordList.get(i), states[i + 1]), Collections.emptySet());
+
+            }
+            return states[0];
+        } else if (wildCards == wordList.size()) {
+            Map<TopicWord, TopicMatcherDFAState> stateMap = new HashMap<>();
+            TopicMatcherDFAState state = new TopicMatcherDFAState(stateMap, Collections.singleton(result));
+            stateMap.put(TopicWord.ANY_WORD, state);
+            return state;
+        }
+
+        int positionCount = wordList.size() - wildCards;
+
+        Position[] positions = new Position[positionCount + 1];
+
+        int lastWord;
+
+        if (wordList.get(wordList.size() - 1) == TopicWord.WILDCARD_WORD) {
+            lastWord = wordList.size() - 1;
+            positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, true, true);
+        } else {
+            lastWord = wordList.size();
+            positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, false, true);
+        }
+
+        int pos = 0;
+        int wordPos = 0;
+
+        while (wordPos < lastWord) {
+            TopicWord word = wordList.get(wordPos++);
+
+            if (word == TopicWord.WILDCARD_WORD) {
+                int nextWordPos = wordPos++;
+                word = wordList.get(nextWordPos);
+
+                positions[pos] = new Position(pos++, word, true, false);
+            } else {
+                positions[pos] = new Position(pos++, word, false, false);
+            }
+
+        }
+
+        for (int p = 0; p < positionCount; p++) {
+            boolean followedByWildcards = true;
+
+            int n = p;
+            while (followedByWildcards && n < (positionCount + 1)) {
+
+                if (positions[n].isSelfTransition()) {
+                    break;
+                } else if (positions[n].getWord() != TopicWord.ANY_WORD) {
+                    followedByWildcards = false;
+                }
+                n++;
+            }
+
+            positions[p].setFollowedByAnyLoop(followedByWildcards && (n != positionCount + 1));
+        }
+
+        // from each position you transition to a set of other positions.
+        // we approach this by examining steps of increasing length - so we
+        // look how far we can go from the start position in 1 word, 2 words, etc...
+
+        Map<Set<Position>, SimpleState> stateMap = new HashMap<>();
+
+        SimpleState state = new SimpleState();
+        state._positions = Collections.singleton(positions[0]);
+        stateMap.put(state._positions, state);
+
+        calculateNextStates(state, stateMap, positions);
+
+        SimpleState[] simpleStates = stateMap.values().toArray(new SimpleState[stateMap.size()]);
+        HashMap<TopicWord, TopicMatcherDFAState>[] dfaStateMaps = new HashMap[simpleStates.length];
+        Map<SimpleState, TopicMatcherDFAState> simple2DFAMap = new HashMap<>();
+
+        for (int i = 0; i < simpleStates.length; i++) {
+
+            Collection<TopicMatcherResult> results;
+            boolean endState = false;
+
+            for (Position p : simpleStates[i]._positions) {
+                if (p.isEndState()) {
+                    endState = true;
+                    break;
+                }
+            }
+
+            if (endState) {
+                results = Collections.singleton(result);
+            } else {
+                results = Collections.emptySet();
+            }
+
+            dfaStateMaps[i] = new HashMap<>();
+            simple2DFAMap.put(simpleStates[i], new TopicMatcherDFAState(dfaStateMaps[i], results));
+
+        }
+        for (int i = 0; i < simpleStates.length; i++) {
+            SimpleState simpleState = simpleStates[i];
+
+            Map<TopicWord, SimpleState> nextSimpleStateMap = simpleState._nextState;
+            for (Map.Entry<TopicWord, SimpleState> stateMapEntry : nextSimpleStateMap.entrySet()) {
+                dfaStateMaps[i].put(stateMapEntry.getKey(), simple2DFAMap.get(stateMapEntry.getValue()));
+            }
+
+        }
+
+        return simple2DFAMap.get(state);
+
+    }
+
+    private void calculateNextStates(final SimpleState state,
+        final Map<Set<Position>, SimpleState> stateMap,
+        final Position[] positions) {
+        Map<TopicWord, Set<Position>> transitions = new HashMap<>();
+
+        for (Position pos : state._positions) {
+            if (pos.isSelfTransition()) {
+                Set<Position> dest = transitions.get(TopicWord.ANY_WORD);
+                if (dest == null) {
+                    dest = new HashSet<>();
+                    transitions.put(TopicWord.ANY_WORD, dest);
+                }
+                dest.add(pos);
+            }
+
+            final int nextPos = pos.getPosition() + 1;
+            Position nextPosition = nextPos == positions.length ? ERROR_POSITION : positions[nextPos];
+
+            Set<Position> dest = transitions.get(pos.getWord());
+            if (dest == null) {
+                dest = new HashSet<>();
+                transitions.put(pos.getWord(), dest);
+            }
+            dest.add(nextPosition);
+
+        }
+
+        Set<Position> anyWordTransitions = transitions.get(TopicWord.ANY_WORD);
+        if (anyWordTransitions != null) {
+            for (Set<Position> dest : transitions.values()) {
+                dest.addAll(anyWordTransitions);
+            }
+        }
+
+        state._nextState = new HashMap<>();
+
+        for (Map.Entry<TopicWord, Set<Position>> dest : transitions.entrySet()) {
+
+            if (dest.getValue().size() > 1) {
+                dest.getValue().remove(ERROR_POSITION);
+            }
+            Position loopingTerminal = null;
+            for (Position destPos : dest.getValue()) {
+                if (destPos.isSelfTransition() && destPos.isEndState()) {
+                    loopingTerminal = destPos;
+                    break;
+                }
+            }
+
+            if (loopingTerminal != null) {
+                dest.setValue(Collections.singleton(loopingTerminal));
+            } else {
+                Position anyLoop = null;
+                for (Position destPos : dest.getValue()) {
+                    if (destPos.isFollowedByAnyLoop()) {
+                        if (anyLoop == null || anyLoop.getPosition() < destPos.getPosition()) {
+                            anyLoop = destPos;
+                        }
+                    }
+                }
+                if (anyLoop != null) {
+                    Collection<Position> removals = new ArrayList<>();
+                    for (Position destPos : dest.getValue()) {
+                        if (destPos.getPosition() < anyLoop.getPosition()) {
+                            removals.add(destPos);
+                        }
+                    }
+                    dest.getValue().removeAll(removals);
+                }
+            }
+
+            SimpleState stateForEntry = stateMap.get(dest.getValue());
+            if (stateForEntry == null) {
+                stateForEntry = new SimpleState();
+                stateForEntry._positions = dest.getValue();
+                stateMap.put(dest.getValue(), stateForEntry);
+                calculateNextStates(stateForEntry,
+                    stateMap,
+                    positions);
+            }
+            state._nextState.put(dest.getKey(), stateForEntry);
+
+        }
+
+        // remove redundant transitions
+        SimpleState anyWordState = state._nextState.get(TopicWord.ANY_WORD);
+        if (anyWordState != null) {
+            List<TopicWord> removeList = new ArrayList<>();
+            for (Map.Entry<TopicWord, SimpleState> entry : state._nextState.entrySet()) {
+                if (entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD) {
+                    removeList.add(entry.getKey());
+                }
+            }
+            for (TopicWord removeKey : removeList) {
+                state._nextState.remove(removeKey);
+            }
+        }
+
+    }
+
+    private List<TopicWord> createTopicWordList(final String bindingKey) {
+        String[] tokens = bindingKey.split(TOPIC_DELIMITER);
+        TopicWord previousWord = null;
+
+        List<TopicWord> wordList = new ArrayList<>();
+
+        for (String token : tokens) {
+            TopicWord nextWord = _dictionary.getOrCreateWord(token);
+            if (previousWord == TopicWord.WILDCARD_WORD) {
+
+                if (nextWord == TopicWord.WILDCARD_WORD) {
+                    // consecutive wildcards can be merged
+                    // i.e. subsequent wildcards can be discarded
+                    continue;
+                } else if (nextWord == TopicWord.ANY_WORD) {
+                    // wildcard and anyword can be reordered to always put anyword first
+                    wordList.set(wordList.size() - 1, TopicWord.ANY_WORD);
+                    nextWord = TopicWord.WILDCARD_WORD;
+                }
+            }
+            wordList.add(nextWord);
+            previousWord = nextWord;
+
+        }
+        return wordList;
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org