You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2017/06/19 15:49:28 UTC

cxf git commit: [CXF-7414] More updates to resume the "next" message instead of whichever was first in teh continuation list

Repository: cxf
Updated Branches:
  refs/heads/master f00e87a45 -> df47c2d60


[CXF-7414] More updates to resume the "next" message instead of whichever was first in teh continuation list


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/df47c2d6
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/df47c2d6
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/df47c2d6

Branch: refs/heads/master
Commit: df47c2d60100ac528b12f862166e3ac59d72f751
Parents: f00e87a
Author: Daniel Kulp <dk...@apache.org>
Authored: Mon Jun 19 11:49:03 2017 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Mon Jun 19 11:49:03 2017 -0400

----------------------------------------------------------------------
 .../apache/cxf/ws/rm/DestinationSequence.java   | 29 ++++++++++++++------
 1 file changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/df47c2d6/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index c7d5ae3..59327cb 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -24,10 +24,11 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TimerTask;
+import java.util.TreeMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -67,7 +68,8 @@ public class DestinationSequence extends AbstractSequence {
     private volatile long inProcessNumber;
     private volatile long highNumberCompleted;
     private long nextInOrder;
-    private List<Continuation> continuations = new LinkedList<Continuation>();
+    //be careful, must be used in sync block
+    private Map<Long, Continuation> continuations = new TreeMap<Long, Continuation>();
     // this map is used for robust and redelivery tracking. for redelivery it holds the beingDeliverd messages
     private Set<Long> deliveringMessageNumbers = new HashSet<>();
 
@@ -383,7 +385,7 @@ public class DestinationSequence extends AbstractSequence {
             if (continuation != null) {
                 continuation.setObject(message);
                 if (continuation.suspend(-1)) {
-                    continuations.add(continuation);
+                    continuations.put(mn, continuation);
                     throw new SuspendedInvocationException();
                 }
             }
@@ -396,17 +398,28 @@ public class DestinationSequence extends AbstractSequence {
             }
         }
     }
-    synchronized void wakeupAll() {
-        if (!continuations.isEmpty()) {
-            continuations.remove(0).resume();
+    synchronized void wakeupNext(long i) {
+        try {
+            Continuation c = continuations.remove(i + 1);
+            if (c != null) {
+                //next was found, don't resume everything, just the next one
+                c.resume();
+                return;
+            }
+            //next wasn't found, just resume whatever is first...
+            for (Map.Entry<Long, Continuation> entry : continuations.entrySet()) {
+                entry.getValue().resume();
+                return;
+            }
+        } finally {
+            notifyAll();
         }
-        notifyAll();
     }
 
     synchronized void processingComplete(long mn) {
         inProcessNumber = 0;
         highNumberCompleted = mn;
-        wakeupAll();
+        wakeupNext(mn);
     }
 
     void purgeAcknowledged(long messageNr) {