You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2017/06/20 06:48:16 UTC
[1/3] cxf git commit: [CXF-7414] Don't resume if we know it's not the
next message
Repository: cxf
Updated Branches:
refs/heads/3.1.x-fixes 35ae5ea6c -> 78042fae0
[CXF-7414] Don't resume if we know it's not the next message
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/78042fae
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/78042fae
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/78042fae
Branch: refs/heads/3.1.x-fixes
Commit: 78042fae046a7b32ffbe2c16ef2eb72f3c28bfc6
Parents: b809337
Author: Daniel Kulp <dk...@apache.org>
Authored: Mon Jun 19 21:55:43 2017 -0400
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Tue Jun 20 08:37:52 2017 +0200
----------------------------------------------------------------------
.../main/java/org/apache/cxf/ws/rm/DestinationSequence.java | 6 +-----
1 file changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/78042fae/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index 7ac5128..a11eb67 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -406,11 +406,7 @@ public class DestinationSequence extends AbstractSequence {
c.resume();
return;
}
- //next wasn't found, just resume whatever is first...
- for (Map.Entry<Long, Continuation> entry : continuations.entrySet()) {
- entry.getValue().resume();
- return;
- }
+ //next wasn't found, nothing to resume, assume it will come in later...
} finally {
notifyAll();
}
[3/3] cxf git commit: [CXF-7414] Test and fix for ws-rm out of order
issue. Thanks to Dan
Posted by cs...@apache.org.
[CXF-7414] Test and fix for ws-rm out of order issue. Thanks to Dan
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0a6ff4cb
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0a6ff4cb
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0a6ff4cb
Branch: refs/heads/3.1.x-fixes
Commit: 0a6ff4cb91bbb950f66af9d95511ffcb5ee9d9fb
Parents: 35ae5ea
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Mon Jun 19 17:34:19 2017 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Tue Jun 20 08:37:52 2017 +0200
----------------------------------------------------------------------
.../apache/cxf/ws/rm/DestinationSequence.java | 5 +-
.../ws/rm/DeliveryAssuranceOnewayTest.java | 40 +++++++++++
.../ws/rm/SingleMessageDelaySimulator.java | 75 ++++++++++++++++++++
.../cxf/systest/ws/rm/atleastonce-inorder.xml | 40 +++++++++++
4 files changed, 157 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/0a6ff4cb/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index d186194..87c7ecb 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -397,9 +397,8 @@ public class DestinationSequence extends AbstractSequence {
}
}
synchronized void wakeupAll() {
- while (!continuations.isEmpty()) {
- Continuation c = continuations.remove(0);
- c.resume();
+ if (!continuations.isEmpty()) {
+ continuations.remove(0).resume();
}
notifyAll();
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0a6ff4cb/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
index ee9b45f..6fd8f7b 100644
--- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
@@ -25,7 +25,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import javax.jws.WebService;
@@ -236,6 +238,44 @@ public class DeliveryAssuranceOnewayTest extends AbstractBusClientServerTestBase
assertTrue("Message out of order", argNum < callArgs.length);
}
}
+
+ @Test
+ public void testOnewayAtLeastOnceInOrderDelay() throws Exception {
+ int numMessages = 4;
+ init("org/apache/cxf/systest/ws/rm/atleastonce-inorder.xml", null);
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ SingleMessageDelaySimulator sps = new SingleMessageDelaySimulator();
+ sps.setDelay(600L);
+ greeterBus.getOutInterceptors().add(sps);
+ int num = 1;
+ greeter.greetMe(Integer.toString(num++));
+ for (int c = 2; c <= numMessages; c++) {
+ int currentNum = num++;
+ Thread.sleep(100);
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ greeter.greetMe(Integer.toString(currentNum));
+ }
+ });
+ }
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ LOG.info("Waiting for " + numMessages + " messages to arrive");
+ awaitMessages(numMessages, 1000, 10000);
+ List<String> actualArgs = GreeterProvider.CALL_ARGS;
+ assertEquals("Some messages were not received", numMessages, actualArgs.size());
+ assertInOrder(actualArgs);
+ }
+
+ private void assertInOrder(List<String> actualArgs) {
+ int argNum = 0;
+ for (String actual : actualArgs) {
+ argNum++;
+ assertEquals(Integer.toString(argNum), actual);
+ }
+ }
@Test
public void testAtMostOnceInOrder() throws Exception {
http://git-wip-us.apache.org/repos/asf/cxf/blob/0a6ff4cb/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SingleMessageDelaySimulator.java
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SingleMessageDelaySimulator.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SingleMessageDelaySimulator.java
new file mode 100644
index 0000000..369efa5
--- /dev/null
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SingleMessageDelaySimulator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.ws.rm.RMMessageConstants;
+import org.apache.cxf.ws.rm.RMProperties;
+
+/**
+ * Delay a single message
+ */
+public class SingleMessageDelaySimulator extends AbstractPhaseInterceptor<Message> {
+ private static final Logger LOG = LogUtils.getLogger(SingleMessageDelaySimulator.class);
+
+ private long delay = 10000L;
+ private long messageNumber = 2;
+
+ public SingleMessageDelaySimulator() {
+ this(Phase.USER_PROTOCOL);
+ }
+
+ public SingleMessageDelaySimulator(String p) {
+ super(p);
+ }
+
+ public void setDelay(long delay) {
+ this.delay = delay;
+ }
+
+ public void setMessageNumber(long messageNumber) {
+ this.messageNumber = messageNumber;
+ }
+
+ public void handleMessage(Message message) throws Fault {
+ RMProperties rmProps = (RMProperties)message.get(RMMessageConstants.RM_PROPERTIES_OUTBOUND);
+ if (rmProps != null && rmProps.getMessageNumber() == messageNumber) {
+ sleep();
+ }
+ }
+
+ private void sleep() {
+ LOG.log(Level.INFO, "sleeping " + delay + " msec ...");
+ try {
+ Thread.sleep(delay);
+ LOG.log(Level.INFO, "continuing");
+ } catch (InterruptedException e) {
+ LOG.log(Level.INFO, "interrupted");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0a6ff4cb/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce-inorder.xml
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce-inorder.xml b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce-inorder.xml
new file mode 100644
index 0000000..064c044
--- /dev/null
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce-inorder.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager" xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+
+ xsi:schemaLocation=" http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
+ http://cxf.apache.org/transports/http/configuration http://cxf.apache.org/transports/http/configuration"
+>
+ <import resource="rminterceptors.xml"/>
+
+ <wsrm-mgr:rmManager id="org.apache.cxf.ws.rm.RMManager">
+ <wsrm-policy:RMAssertion>
+ <wsrm-policy:BaseRetransmissionInterval Milliseconds="60000"/>
+ <wsrm-policy:AcknowledgementInterval Milliseconds="10000"/>
+ </wsrm-policy:RMAssertion>
+ <wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:AtLeastOnce/>
+ <wsrm-mgr:InOrder/>
+ </wsrm-mgr:deliveryAssurance>
+ <wsrm-mgr:destinationPolicy>
+ <wsrm-mgr:acksPolicy intraMessageThreshold="0"/>
+ </wsrm-mgr:destinationPolicy>
+ </wsrm-mgr:rmManager>
+</beans>
[2/3] cxf git commit: [CXF-7414] More updates to resume the "next"
message instead of whichever was first in teh continuation list
Posted by cs...@apache.org.
[CXF-7414] More updates to resume the "next" message instead of whichever was first in teh continuation list
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/b8093370
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/b8093370
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/b8093370
Branch: refs/heads/3.1.x-fixes
Commit: b80933704fdd12e9e7c872f9ffbae2d554b2b18d
Parents: 0a6ff4c
Author: Daniel Kulp <dk...@apache.org>
Authored: Mon Jun 19 11:49:03 2017 -0400
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Tue Jun 20 08:37:52 2017 +0200
----------------------------------------------------------------------
.../apache/cxf/ws/rm/DestinationSequence.java | 29 ++++++++++++++------
1 file changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/b8093370/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index 87c7ecb..7ac5128 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -24,10 +24,11 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
+import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -67,7 +68,8 @@ public class DestinationSequence extends AbstractSequence {
private volatile long inProcessNumber;
private volatile long highNumberCompleted;
private long nextInOrder;
- private List<Continuation> continuations = new LinkedList<Continuation>();
+ //be careful, must be used in sync block
+ private Map<Long, Continuation> continuations = new TreeMap<Long, Continuation>();
// this map is used for robust and redelivery tracking. for redelivery it holds the beingDeliverd messages
private Set<Long> deliveringMessageNumbers = new HashSet<Long>();
@@ -383,7 +385,7 @@ public class DestinationSequence extends AbstractSequence {
if (continuation != null) {
continuation.setObject(message);
if (continuation.suspend(-1)) {
- continuations.add(continuation);
+ continuations.put(mn, continuation);
throw new SuspendedInvocationException();
}
}
@@ -396,17 +398,28 @@ public class DestinationSequence extends AbstractSequence {
}
}
}
- synchronized void wakeupAll() {
- if (!continuations.isEmpty()) {
- continuations.remove(0).resume();
+ synchronized void wakeupNext(long i) {
+ try {
+ Continuation c = continuations.remove(i + 1);
+ if (c != null) {
+ //next was found, don't resume everything, just the next one
+ c.resume();
+ return;
+ }
+ //next wasn't found, just resume whatever is first...
+ for (Map.Entry<Long, Continuation> entry : continuations.entrySet()) {
+ entry.getValue().resume();
+ return;
+ }
+ } finally {
+ notifyAll();
}
- notifyAll();
}
synchronized void processingComplete(long mn) {
inProcessNumber = 0;
highNumberCompleted = mn;
- wakeupAll();
+ wakeupNext(mn);
}
void purgeAcknowledged(long messageNr) {