You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2017/06/20 06:48:16 UTC

[1/3] cxf git commit: [CXF-7414] Don't resume if we know it's not the next message

Repository: cxf
Updated Branches:
  refs/heads/3.1.x-fixes 35ae5ea6c -> 78042fae0


[CXF-7414] Don't resume if we know it's not the next message


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/78042fae
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/78042fae
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/78042fae

Branch: refs/heads/3.1.x-fixes
Commit: 78042fae046a7b32ffbe2c16ef2eb72f3c28bfc6
Parents: b809337
Author: Daniel Kulp <dk...@apache.org>
Authored: Mon Jun 19 21:55:43 2017 -0400
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Tue Jun 20 08:37:52 2017 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/cxf/ws/rm/DestinationSequence.java    | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/78042fae/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index 7ac5128..a11eb67 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -406,11 +406,7 @@ public class DestinationSequence extends AbstractSequence {
                 c.resume();
                 return;
             }
-            //next wasn't found, just resume whatever is first...
-            for (Map.Entry<Long, Continuation> entry : continuations.entrySet()) {
-                entry.getValue().resume();
-                return;
-            }
+            //next wasn't found, nothing to resume, assume it will come in later...
         } finally {
             notifyAll();
         }


[3/3] cxf git commit: [CXF-7414] Test and fix for ws-rm out of order issue. Thanks to Dan

Posted by cs...@apache.org.
[CXF-7414] Test and fix for ws-rm out of order issue. Thanks to Dan


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0a6ff4cb
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0a6ff4cb
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0a6ff4cb

Branch: refs/heads/3.1.x-fixes
Commit: 0a6ff4cb91bbb950f66af9d95511ffcb5ee9d9fb
Parents: 35ae5ea
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Mon Jun 19 17:34:19 2017 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Tue Jun 20 08:37:52 2017 +0200

----------------------------------------------------------------------
 .../apache/cxf/ws/rm/DestinationSequence.java   |  5 +-
 .../ws/rm/DeliveryAssuranceOnewayTest.java      | 40 +++++++++++
 .../ws/rm/SingleMessageDelaySimulator.java      | 75 ++++++++++++++++++++
 .../cxf/systest/ws/rm/atleastonce-inorder.xml   | 40 +++++++++++
 4 files changed, 157 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/0a6ff4cb/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index d186194..87c7ecb 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -397,9 +397,8 @@ public class DestinationSequence extends AbstractSequence {
         }
     }
     synchronized void wakeupAll() {
-        while (!continuations.isEmpty()) {
-            Continuation c = continuations.remove(0);
-            c.resume();
+        if (!continuations.isEmpty()) {
+            continuations.remove(0).resume();
         }
         notifyAll();
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0a6ff4cb/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
index ee9b45f..6fd8f7b 100644
--- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
@@ -25,7 +25,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
 import javax.jws.WebService;
@@ -236,6 +238,44 @@ public class DeliveryAssuranceOnewayTest extends AbstractBusClientServerTestBase
             assertTrue("Message out of order", argNum < callArgs.length);
         }
     }
+    
+    @Test
+    public void testOnewayAtLeastOnceInOrderDelay() throws Exception {
+        int numMessages = 4;
+        init("org/apache/cxf/systest/ws/rm/atleastonce-inorder.xml", null);
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+        SingleMessageDelaySimulator sps = new SingleMessageDelaySimulator();
+        sps.setDelay(600L);
+        greeterBus.getOutInterceptors().add(sps);
+        int num = 1;
+        greeter.greetMe(Integer.toString(num++));
+        for (int c = 2; c <= numMessages; c++) {
+            int currentNum = num++;
+            Thread.sleep(100);
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    greeter.greetMe(Integer.toString(currentNum));
+                }
+            });
+        }
+        executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
+        LOG.info("Waiting for " + numMessages + " messages to arrive");
+        awaitMessages(numMessages, 1000, 10000);
+        List<String> actualArgs = GreeterProvider.CALL_ARGS;
+        assertEquals("Some messages were not received", numMessages, actualArgs.size());
+        assertInOrder(actualArgs);
+    }
+
+    private void assertInOrder(List<String> actualArgs) {
+        int argNum = 0;
+        for (String actual : actualArgs) {
+            argNum++;
+            assertEquals(Integer.toString(argNum), actual);
+        }
+    }
 
     @Test    
     public void testAtMostOnceInOrder() throws Exception {

http://git-wip-us.apache.org/repos/asf/cxf/blob/0a6ff4cb/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SingleMessageDelaySimulator.java
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SingleMessageDelaySimulator.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SingleMessageDelaySimulator.java
new file mode 100644
index 0000000..369efa5
--- /dev/null
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SingleMessageDelaySimulator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.ws.rm.RMMessageConstants;
+import org.apache.cxf.ws.rm.RMProperties;
+
+/**
+ * Delay a single message
+ */
+public class SingleMessageDelaySimulator extends AbstractPhaseInterceptor<Message> {
+    private static final Logger LOG = LogUtils.getLogger(SingleMessageDelaySimulator.class);
+    
+    private long delay = 10000L;
+    private long messageNumber = 2;
+    
+    public SingleMessageDelaySimulator() {
+        this(Phase.USER_PROTOCOL);
+    }
+    
+    public SingleMessageDelaySimulator(String p) {
+        super(p);
+    }
+    
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
+    
+    public void setMessageNumber(long messageNumber) {
+        this.messageNumber = messageNumber;
+    }
+
+    public void handleMessage(Message message) throws Fault {
+        RMProperties rmProps = (RMProperties)message.get(RMMessageConstants.RM_PROPERTIES_OUTBOUND);
+        if (rmProps != null && rmProps.getMessageNumber()  == messageNumber) {
+            sleep();
+        }
+    }
+
+    private void sleep() {
+        LOG.log(Level.INFO, "sleeping " + delay + " msec ...");
+        try {
+            Thread.sleep(delay);
+            LOG.log(Level.INFO, "continuing");
+        } catch (InterruptedException e) {
+            LOG.log(Level.INFO, "interrupted");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0a6ff4cb/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce-inorder.xml
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce-inorder.xml b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce-inorder.xml
new file mode 100644
index 0000000..064c044
--- /dev/null
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce-inorder.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+        Licensed to the Apache Software Foundation (ASF) under one
+        or more contributor license agreements. See the NOTICE file
+        distributed with this work for additional information
+        regarding copyright ownership. The ASF licenses this file
+        to you under the Apache License, Version 2.0 (the
+        "License"); you may not use this file except in compliance
+        with the License. You may obtain a copy of the License at
+        
+        http://www.apache.org/licenses/LICENSE-2.0
+        
+        Unless required by applicable law or agreed to in writing,
+        software distributed under the License is distributed on an
+        "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+        KIND, either express or implied. See the License for the
+        specific language governing permissions and limitations
+        under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager" xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ 
+ xsi:schemaLocation=" http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
+ http://cxf.apache.org/transports/http/configuration http://cxf.apache.org/transports/http/configuration"
+>
+    <import resource="rminterceptors.xml"/>
+
+    <wsrm-mgr:rmManager id="org.apache.cxf.ws.rm.RMManager">
+        <wsrm-policy:RMAssertion>
+            <wsrm-policy:BaseRetransmissionInterval Milliseconds="60000"/>
+            <wsrm-policy:AcknowledgementInterval Milliseconds="10000"/>
+        </wsrm-policy:RMAssertion>
+        <wsrm-mgr:deliveryAssurance>
+            <wsrm-mgr:AtLeastOnce/>
+            <wsrm-mgr:InOrder/>
+        </wsrm-mgr:deliveryAssurance>
+        <wsrm-mgr:destinationPolicy>
+            <wsrm-mgr:acksPolicy intraMessageThreshold="0"/>
+        </wsrm-mgr:destinationPolicy>
+    </wsrm-mgr:rmManager>
+</beans>


[2/3] cxf git commit: [CXF-7414] More updates to resume the "next" message instead of whichever was first in teh continuation list

Posted by cs...@apache.org.
[CXF-7414] More updates to resume the "next" message instead of whichever was first in teh continuation list


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/b8093370
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/b8093370
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/b8093370

Branch: refs/heads/3.1.x-fixes
Commit: b80933704fdd12e9e7c872f9ffbae2d554b2b18d
Parents: 0a6ff4c
Author: Daniel Kulp <dk...@apache.org>
Authored: Mon Jun 19 11:49:03 2017 -0400
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Tue Jun 20 08:37:52 2017 +0200

----------------------------------------------------------------------
 .../apache/cxf/ws/rm/DestinationSequence.java   | 29 ++++++++++++++------
 1 file changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/b8093370/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index 87c7ecb..7ac5128 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -24,10 +24,11 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TimerTask;
+import java.util.TreeMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -67,7 +68,8 @@ public class DestinationSequence extends AbstractSequence {
     private volatile long inProcessNumber;
     private volatile long highNumberCompleted;
     private long nextInOrder;
-    private List<Continuation> continuations = new LinkedList<Continuation>();
+    //be careful, must be used in sync block
+    private Map<Long, Continuation> continuations = new TreeMap<Long, Continuation>();
     // this map is used for robust and redelivery tracking. for redelivery it holds the beingDeliverd messages
     private Set<Long> deliveringMessageNumbers = new HashSet<Long>();
     
@@ -383,7 +385,7 @@ public class DestinationSequence extends AbstractSequence {
             if (continuation != null) {
                 continuation.setObject(message);
                 if (continuation.suspend(-1)) {
-                    continuations.add(continuation);
+                    continuations.put(mn, continuation);
                     throw new SuspendedInvocationException();
                 }
             }
@@ -396,17 +398,28 @@ public class DestinationSequence extends AbstractSequence {
             }
         }
     }
-    synchronized void wakeupAll() {
-        if (!continuations.isEmpty()) {
-            continuations.remove(0).resume();
+    synchronized void wakeupNext(long i) {
+        try {
+            Continuation c = continuations.remove(i + 1);
+            if (c != null) {
+                //next was found, don't resume everything, just the next one
+                c.resume();
+                return;
+            }
+            //next wasn't found, just resume whatever is first...
+            for (Map.Entry<Long, Continuation> entry : continuations.entrySet()) {
+                entry.getValue().resume();
+                return;
+            }
+        } finally {
+            notifyAll();
         }
-        notifyAll();
     }
     
     synchronized void processingComplete(long mn) {
         inProcessNumber = 0;
         highNumberCompleted = mn;
-        wakeupAll();
+        wakeupNext(mn);
     }
     
     void purgeAcknowledged(long messageNr) {