You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/10/10 16:36:01 UTC

[2/3] aries-jax-rs-whiteboard git commit: Adapt to new router interface

Adapt to new router interface


Project: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/commit/9b895c0d
Tree: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/tree/9b895c0d
Diff: http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/diff/9b895c0d

Branch: refs/heads/master
Commit: 9b895c0de252ec4379bcfc39cd7e6c81b2e6a312
Parents: 7c7b771
Author: Carlos Sierra <cs...@apache.org>
Authored: Wed Oct 4 14:26:11 2017 +0200
Committer: Carlos Sierra <cs...@apache.org>
Committed: Tue Oct 10 17:59:42 2017 +0200

----------------------------------------------------------------------
 .../aries/jax/rs/whiteboard/internal/Utils.java | 112 +++++++++----------
 1 file changed, 53 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-jax-rs-whiteboard/blob/9b895c0d/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
----------------------------------------------------------------------
diff --git a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
index c4967cc..b373e20 100644
--- a/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
+++ b/jax-rs.whiteboard/src/main/java/org/apache/aries/jax/rs/whiteboard/internal/Utils.java
@@ -19,6 +19,7 @@ package org.apache.aries.jax.rs.whiteboard.internal;
 
 import org.apache.aries.osgi.functional.Event;
 import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.SentEvent;
 import org.apache.cxf.jaxrs.lifecycle.ResourceProvider;
 import org.apache.cxf.message.Message;
 import org.osgi.framework.ServiceObjects;
@@ -292,6 +293,7 @@ public class Utils {
 
         private final TreeSet<Event<T>> _set;
         private final Comparator<Event<T>> _comparator;
+        private SentEvent _sent;
 
         public HighestRankedRouter() {
             _comparator = Comparator.comparing(Event::getContent);
@@ -303,21 +305,11 @@ public class Utils {
         public void accept(OSGi.Router<T> router) {
             router.onIncoming(ev -> {
                 synchronized (_set) {
-                    Event<T> last = _set.size() > 0 ? _set.last() : null;
-
-                    boolean higher =
-                        (last == null) ||
-                            (_comparator.compare(ev, last) > 0);
-
-                    if (higher) {
-                        if (last != null) {
-                            router.signalLeave(last);
-                        }
+                    _set.add(ev);
 
-                        router.signalAdd(ev);
+                    if (ev == _set.last()) {
+                        _sent = router.signalAdd(ev);
                     }
-
-                    _set.add(ev);
                 }
             });
             router.onLeaving(ev -> {
@@ -326,35 +318,23 @@ public class Utils {
                         return;
                     }
 
-                    T content = ev.getContent();
-
-                    Event<T> last = _set.last();
-
-                    if (content.equals(last.getContent())) {
-                        router.signalLeave(ev);
+                    _set.remove(ev);
 
-                        Event<T> penultimate = _set.lower(ev);
-
-                        if (penultimate != null) {
-                            router.signalAdd(penultimate);
-                        }
+                    if (_sent != null && _sent.getEvent() == ev) {
+                        _sent = null;
                     }
 
-                    _set.removeIf(t -> content.equals(t.getContent()));
+                    if (!_set.isEmpty()) {
+                        _sent = router.signalAdd(_set.last());
+                    }
                 }
             });
             router.onClose(() -> {
-                synchronized (_set) {
-                    Iterator<Event<T>> iterator = _set.descendingIterator();
-
-                    while (iterator.hasNext()) {
-                        Event<T> event = iterator.next();
-
-                        router.signalLeave(event);
-
-                        iterator.remove();
-                    }
+                if (_sent != null) {
+                    _sent.terminate();
                 }
+
+                _sent = null;
             });
         }
 
@@ -368,6 +348,7 @@ public class Utils {
             new ConcurrentHashMap<>();
         private final Consumer<T> _onAddingShadowed;
         private final Consumer<T> _onRemovedShadowed;
+        private final Map<K, SentEvent<T>> _sentEvents = new HashMap<>();
 
         public HighestPerRouter(
             Function<T, K> keySupplier,
@@ -393,26 +374,26 @@ public class Utils {
                             set = new TreeSet<>(comparator);
                         }
 
-                        Event<T> last = set.size() > 0 ? set.last() : null;
+                        set.add(e);
+
+                        if (e == set.last()) {
+                            SentEvent<T> oldEvent = _sentEvents.get(key);
 
-                        boolean higher =
-                            (last == null) ||
-                                (comparator.compare(e, last) > 0);
+                            if (oldEvent != null) {
+                                _sentEvents.remove(key);
 
-                        if (higher) {
-                            if (last != null) {
-                                router.signalLeave(last);
+                                oldEvent.terminate();
 
-                                _onAddingShadowed.accept(last.getContent());
+                                _onAddingShadowed.accept(
+                                    oldEvent.getEvent().getContent());
                             }
 
-                            router.signalAdd(e);
-                        } else {
+                            _sentEvents.put(key, router.signalAdd(e));
+                        }
+                        else {
                             _onAddingShadowed.accept(e.getContent());
                         }
 
-                        set.add(e);
-
                         return set;
                     });
             });
@@ -428,29 +409,42 @@ public class Utils {
                             return set;
                         }
 
-                        Event<T> last = set.last();
+                        set.remove(e);
 
-                        if (content.equals(last.getContent())) {
-                            router.signalLeave(e);
+                        _sentEvents.compute(key, (___, sentEvent) -> {
+                            if (sentEvent.getEvent() == e) {
+                                if (!set.isEmpty()) {
+                                    Event<T> last = set.last();
 
-                            Event<T> penultimate = set.lower(last);
+                                    SentEvent<T> event = router.signalAdd(last);
 
-                            if (penultimate != null) {
-                                router.signalAdd(penultimate);
+                                    _onRemovedShadowed.accept(
+                                        last.getContent());
 
-                                _onRemovedShadowed.accept(
-                                    penultimate.getContent());
+                                    return event;
+                                }
+
+                                return null;
                             }
-                        } else {
-                            _onRemovedShadowed.accept(content);
-                        }
+                            else {
+                                _onRemovedShadowed.accept(e.getContent());
+                            }
+
+                            return sentEvent;
+                        });
+
 
-                        set.removeIf(t -> t.getContent().equals(content));
 
                         return set;
                     }
                 );
             });
+
+            router.onClose(() -> {
+                _sentEvents.values().forEach(SentEvent::terminate);
+
+                _sentEvents.clear();
+            });
         }
 
     }