You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2006/11/27 10:33:24 UTC
svn commit: r479557 - in /incubator/cxf/trunk:
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/
systests/src/test/java/org/apache/cxf/systest/ws/rm/
Author: andreasmyth
Date: Mon Nov 27 01:33:23 2006
New Revision: 479557
URL: http://svn.apache.org/viewvc?view=rev&rev=479557
Log:
[JIRA CXF-140] Message loss simulator and system test for retransmission of messages in oneway case.
Added:
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java (with props)
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/oneway-message-loss.xml (with props)
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-message-loss.xml (with props)
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties?view=diff&rev=479557&r1=479556&r2=479557
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/Messages.properties Mon Nov 27 01:33:23 2006
@@ -2,6 +2,7 @@
SOAP_HEADER_DECODE_FAILURE_MSG = Failed to decode RM properties from SOAP headers.
RESEND_MSG = WS-RM retransmission of message {0}.
+RESEND_CANDIDATES_CONCURRENT_MODIFICATION_MSG = Candidates were acknowledged while iterating for resend.
RESEND_FAILED_MSG = WS-RM retransmission failed.
RESEND_INITIATION_FAILED_MSG = Failed to initiate retransmission.
NO_TRANSPORT_FOR_RESEND_MSG = No transport available for WS-RM retransmission.
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=479557&r1=479556&r2=479557
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Mon Nov 27 01:33:23 2006
@@ -26,6 +26,7 @@
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -126,6 +127,7 @@
public void purgeAcknowledged(SourceSequence seq) {
Collection<BigInteger> purged = new ArrayList<BigInteger>();
synchronized (this) {
+ LOG.fine("Start purging resend candidates.");
List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
if (null != sequenceCandidates) {
for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
@@ -141,6 +143,7 @@
}
}
}
+ LOG.fine("Completed purging resend candidates.");
}
if (purged.size() > 0) {
RMStore store = manager.getStore();
@@ -312,23 +315,33 @@
* resend attempt.
*/
protected class ResendInitiator implements Runnable {
- public void run() {
+ public void run() {
// iterate over resend candidates, resending any that are due
synchronized (RetransmissionQueueImpl.this) {
+ LOG.fine("Starting ResendInitiator on thread " + Thread.currentThread());
Iterator<Map.Entry<String, List<ResendCandidate>>> sequences = candidates.entrySet()
.iterator();
while (sequences.hasNext()) {
Iterator<ResendCandidate> sequenceCandidates = sequences.next().getValue().iterator();
boolean requestAck = true;
- while (sequenceCandidates.hasNext()) {
- ResendCandidate candidate = sequenceCandidates.next();
- if (candidate.isDue()) {
- candidate.initiate(requestAck);
- requestAck = false;
+ try {
+ while (sequenceCandidates.hasNext()) {
+ ResendCandidate candidate = sequenceCandidates.next();
+ if (candidate.isDue()) {
+ candidate.initiate(requestAck);
+ requestAck = false;
+ }
}
+ } catch (ConcurrentModificationException ex) {
+ // TODO:
+ // can happen if resend occurs on same thread as resend initiation
+ // i.e. when endpoint's executor executes on current thread
+ LOG.log(Level.WARNING, "RESEND_CANDIDATES_CONCURRENT_MODIFICATION_MSG");
}
}
+ LOG.fine("Completed ResendInitiator");
}
+
}
}
Added: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java?view=auto&rev=479557
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java (added)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java Mon Nov 27 01:33:23 2006
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigInteger;
+import java.util.ListIterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.interceptor.MessageSenderInterceptor;
+import org.apache.cxf.io.AbstractWrappedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.phase.PhaseInterceptor;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.rm.RMContextUtils;
+
+/**
+ *
+ */
+public class MessageLossSimulator extends AbstractPhaseInterceptor<Message> {
+
+ private static final Logger LOG = Logger.getLogger(MessageLossSimulator.class.getName());
+ private int appMessageCount;
+
+ public MessageLossSimulator() {
+ super();
+ setPhase(Phase.PREPARE_SEND);
+ addBefore(MessageSenderInterceptor.class.getName());
+ }
+
+ public void handleMessage(Message message) throws Fault {
+ AddressingProperties maps =
+ RMContextUtils.retrieveMAPs(message, false, true);
+ RMContextUtils.ensureExposedVersion(maps);
+ String action = null;
+ if (maps != null && null != maps.getAction()) {
+ action = maps.getAction().getValue();
+ }
+ if (!RMContextUtils.isAplicationMessage(action)) {
+ return;
+ }
+ appMessageCount++;
+ if (0 != (appMessageCount % 2)) {
+ return;
+ }
+
+ InterceptorChain chain = message.getInterceptorChain();
+ ListIterator it = chain.getIterator();
+ while (it.hasNext()) {
+ PhaseInterceptor pi = (PhaseInterceptor)it.next();
+ if (MessageSenderInterceptor.class.getName().equals(pi.getId())) {
+ chain.remove(pi);
+ LOG.fine("Removed MessageSenderInterceptor from interceptor chain.");
+ break;
+ }
+ }
+
+ message.setContent(OutputStream.class, new WrappedOutputStream(message));
+ }
+
+ private class WrappedOutputStream extends AbstractWrappedOutputStream {
+
+ public WrappedOutputStream(Message m) {
+ super(m);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void doFlush() throws IOException {
+ boolean af = alreadyFlushed();
+ if (!af) {
+ if (LOG.isLoggable(Level.FINE)) {
+ BigInteger nr = RMContextUtils.retrieveRMProperties(outMessage, true)
+ .getSequence().getMessageNumber();
+ LOG.fine("Losing message " + nr);
+ }
+ resetOut(new DummyOutputStream(), true);
+ }
+ }
+
+ @Override
+ protected void onWrite() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ private class DummyOutputStream extends OutputStream {
+
+ @Override
+ public void write(int b) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+
+
+}
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java?view=diff&rev=479557&r1=479556&r2=479557
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java Mon Nov 27 01:33:23 2006
@@ -20,7 +20,6 @@
package org.apache.cxf.systest.ws.rm;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
@@ -60,10 +59,6 @@
if (os instanceof AbstractCachedOutputStream) {
((AbstractCachedOutputStream)os).registerCallback(new RecorderCallback());
}
- /*
- ForkOutputStream fos = new ForkOutputStream(os);
- message.setContent(OutputStream.class, fos);
- */
}
@Override
@@ -75,59 +70,12 @@
protected List<byte[]> getOutboundMessages() {
return outbound;
}
-
- /**
- * Output stream that multicasts its data to several underlying output streams.
- */
- class ForkOutputStream extends OutputStream {
-
- final OutputStream original;
- ByteArrayOutputStream bos;
-
- public ForkOutputStream(OutputStream o) {
- original = o;
- bos = new ByteArrayOutputStream();
- }
-
- @Override
- public void close() throws IOException {
- original.close();
- bos.close();
- outbound.add(bos.toByteArray());
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("outbound: " + bos.toString());
- }
- }
-
- @Override
- public void flush() throws IOException {
- original.flush();
- bos.flush();
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- original.write(b, off, len);
- bos.write(b, off, len);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- original.write(b);
- bos.write(b);
- }
-
- @Override
- public void write(int b) throws IOException {
- original.write(b);
- bos.write(b);
- }
- }
class RecorderCallback implements CachedOutputStreamCallback {
public void onFlush(AbstractCachedOutputStream cos) {
// LOG.fine("flushing wrapped output stream: " + cos.getOut().getClass().getName());
+
OutputStream os = cos.getOut();
if (os instanceof ByteArrayOutputStream) {
ByteArrayOutputStream bos = (ByteArrayOutputStream)os;
@@ -136,7 +84,6 @@
LOG.fine("outbound: " + bos.toString());
}
}
-
}
public void onClose(AbstractCachedOutputStream cos) {
Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=479557&r1=479556&r2=479557
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Mon Nov 27 01:33:23 2006
@@ -63,7 +63,7 @@
private OutMessageRecorder outRecorder;
private InMessageRecorder inRecorder;
- private boolean testAll = true;
+ private boolean testAll;
private boolean doTestOnewayAnonymousAcks = testAll;
private boolean doTestOnewayDeferredAnonymousAcks = testAll;
private boolean doTestOnewayDeferredNonAnonymousAcks = testAll;
@@ -72,6 +72,7 @@
private boolean doTestTwowayNonAnonymous = testAll;
private boolean doTestTwowayNonAnonymousDeferred = testAll;
private boolean doTestTwowayNonAnonymousMaximumSequenceLength2 = testAll;
+ private boolean doTestOnewayMessageLoss = true;
public static void main(String[] args) {
junit.textui.TestRunner.run(SequenceTest.class);
@@ -499,6 +500,55 @@
expected[1] = true;
expected[5] = true;
mf.verifyAcknowledgements(expected, false);
+ }
+
+ public void testOnewayMessageLoss() throws Exception {
+ if (!doTestOnewayMessageLoss) {
+ return;
+ }
+ setupGreeter("org/apache/cxf/systest/ws/rm/oneway-message-loss.xml");
+
+ greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+
+ greeter.greetMeOneWay("one");
+ greeter.greetMeOneWay("two");
+ greeter.greetMeOneWay("three");
+ greeter.greetMeOneWay("four");
+
+ awaitMessages(7, 5, 10000);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ // Expected outbound:
+ // CreateSequence
+ // + 4 greetMe messages
+ // + at least 2 resends (message maye be resend multiple times depending
+ // on the timing of the ACKs)
+
+ String[] expectedActions = new String[7];
+ expectedActions[0] = RMConstants.getCreateSequenceAction();
+ for (int i = 1; i < expectedActions.length; i++) {
+ expectedActions[i] = GREETMEONEWAY_ACTION;
+ }
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3", "4", "2", "4"}, true, false);
+ mf.verifyLastMessage(new boolean[7], true);
+ mf.verifyAcknowledgements(new boolean[7], true);
+
+ // Expected inbound:
+ // createSequenceResponse
+ // + 2 partial responses to successfully transmitted messages
+ // + 2 partial responses to resent messages
+
+ mf.verifyMessages(5, false);
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction(),
+ null, null, null, null};
+ mf.verifyActions(expectedActions, false);
+ mf.verifyMessageNumbers(new String[] {null, null, null, null, null}, false);
+ mf.verifyAcknowledgements(new boolean[] {false, true, true, true, true}, false);
+
}
// --- test utilities ---
Added: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/oneway-message-loss.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/oneway-message-loss.xml?view=auto&rev=479557
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/oneway-message-loss.xml (added)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/oneway-message-loss.xml Mon Nov 27 01:33:23 2006
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:wsrm-mgmt="http://cxf.apache.org/ws/rm/manager"
+ xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xmlns:http-conf="http://cxf.apache.org/transports/http/configuration"
+ xsi:schemaLocation="
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <bean id="org.apache.cxf.ws.rm.RMManager" class="org.apache.cxf.ws.rm.RMManager">
+ <property name="bus" ref="cxf"/>
+ <property name="RMAssertion">
+ <value>
+ <wsrm-policy:RMAssertion>
+ <!-- avoid server side resends, change programatically for client -->
+ <wsrm-policy:BaseRetransmissionInterval Milliseconds="60000"/>
+ <wsrm-policy:AcknowledgementInterval Milliseconds="2000"/>
+ </wsrm-policy:RMAssertion>
+ </value>
+ </property>
+ </bean>
+
+ <import resource="rminterceptors.xml"/>
+
+</beans>
\ No newline at end of file
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/oneway-message-loss.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/oneway-message-loss.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/oneway-message-loss.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Added: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-message-loss.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-message-loss.xml?view=auto&rev=479557
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-message-loss.xml (added)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-message-loss.xml Mon Nov 27 01:33:23 2006
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:wsrm-mgmt="http://cxf.apache.org/ws/rm/manager"
+ xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xmlns:http-conf="http://cxf.apache.org/transports/http/configuration"
+ xsi:schemaLocation="
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <!--
+ <bean name="{http://cxf.apache.org/greeter_control}GreeterPort.http-conduit" abstract="true">
+ <property name="client">
+ <value>
+ <http-conf:client DecoupledEndpoint="http://localhost:9993/decoupled_endpoint"/>
+ </value>
+ </property>
+ </bean>
+ -->
+
+ <bean id="org.apache.cxf.ws.rm.RMManager" class="org.apache.cxf.ws.rm.RMManager">
+ <property name="bus" ref="cxf"/>
+ <property name="RMAssertion">
+ <value>
+ <wsrm-policy:RMAssertion>
+ <!-- avoid server side resends, change programatically for client -->
+ <wsrm-policy:BaseRetransmissionInterval Milliseconds="60000"/>
+ <wsrm-policy:AcknowledgementInterval Milliseconds="2000"/>
+ </wsrm-policy:RMAssertion>
+ </value>
+ </property>
+ </bean>
+
+ <import resource="rminterceptors.xml"/>
+
+</beans>
\ No newline at end of file
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-message-loss.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-message-loss.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/twoway-message-loss.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml