You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2006/11/21 15:50:54 UTC
svn commit: r477690 - in /incubator/cxf/trunk:
api/src/main/java/org/apache/cxf/io/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/test/java/org/a...
Author: andreasmyth
Date: Tue Nov 21 06:50:53 2006
New Revision: 477690
URL: http://svn.apache.org/viewvc?view=rev&rev=477690
Log:
[JIRA CXF-138, CXF-140] RetransmissionQueue implementation (client side resends only at the moment)
Added system test testOnewayAnonymousAcksSuppressed.
Added:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java (with props)
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java (with props)
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
- copied, changed from r477271, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/soap/RetransmissionQueueTest.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml (with props)
Modified:
incubator/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractCachedOutputStream.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java
incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Modified: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractCachedOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractCachedOutputStream.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractCachedOutputStream.java (original)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractCachedOutputStream.java Tue Nov 21 06:50:53 2006
@@ -34,6 +34,7 @@
import java.io.PipedOutputStream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.cxf.common.util.Base64Utility;
@@ -69,6 +70,16 @@
callbacks = new ArrayList<CachedOutputStreamCallback>();
}
callbacks.add(cb);
+ }
+
+ public void deregisterCallback(CachedOutputStreamCallback cb) {
+ if (null != callbacks) {
+ callbacks.remove(cb);
+ }
+ }
+
+ public List<CachedOutputStreamCallback> getCallbacks() {
+ return callbacks == null ? null : Collections.unmodifiableList(callbacks);
}
/**
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java Tue Nov 21 06:50:53 2006
@@ -19,14 +19,12 @@
package org.apache.cxf.ws.rm;
-import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.interceptor.Fault;
-import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.Phase;
import org.apache.cxf.phase.PhaseInterceptor;
@@ -94,31 +92,5 @@
// rm logic
abstract void handleMessage(Message msg, boolean isFault) throws SequenceFault;
-
- protected boolean isAplicationMessage(String action) {
- if (RMConstants.getCreateSequenceAction().equals(action)
- || RMConstants.getCreateSequenceResponseAction().equals(action)
- || RMConstants.getTerminateSequenceAction().equals(action)
- || RMConstants.getLastMessageAction().equals(action)
- || RMConstants.getSequenceAcknowledgmentAction().equals(action)
- || RMConstants.getSequenceInfoAction().equals(action)) {
- return false;
- }
- return true;
- }
-
- protected boolean isPartialResponse(Message msg) {
- return RMContextUtils.isOutbound(msg)
- && msg.getContent(List.class) == null
- && getException(msg.getExchange()) == null;
- }
-
- private Exception getException(Exchange exchange) {
- if (exchange.getOutFaultMessage() != null) {
- return exchange.getOutFaultMessage().getContent(Exception.class);
- } else if (exchange.getInFaultMessage() != null) {
- return exchange.getInFaultMessage().getContent(Exception.class);
- }
- return null;
- }
+
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java Tue Nov 21 06:50:53 2006
@@ -19,7 +19,10 @@
package org.apache.cxf.ws.rm;
+import java.util.List;
+
import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
@@ -31,12 +34,12 @@
public final class RMContextUtils {
- /**
+ /**
* Prevents instantiation.
*/
protected RMContextUtils() {
}
-
+
/**
* @return a generated UUID
*/
@@ -44,7 +47,6 @@
return org.apache.cxf.ws.addressing.ContextUtils.generateUUID();
}
-
/**
* Determine if message is outbound.
*
@@ -54,7 +56,6 @@
public static boolean isOutbound(Message message) {
return org.apache.cxf.ws.addressing.ContextUtils.isOutbound(message);
}
-
/**
* Determine if current messaging role is that of requestor.
@@ -65,7 +66,7 @@
public static boolean isRequestor(Message message) {
return org.apache.cxf.ws.addressing.ContextUtils.isRequestor(message);
}
-
+
/**
* Determine if message is currently being processed on server side.
*
@@ -76,13 +77,44 @@
if (isOutbound(message)) {
return message.getExchange().getInMessage() != null;
} else {
- return message.getExchange().getOutMessage() == null
- && message.getExchange().getOutFaultMessage() == null;
+ return message.getExchange().getOutMessage() == null
+ && message.getExchange().getOutFaultMessage() == null;
}
}
-
+
+ /**
+ * Checks if the message is a partial response to a oneway request.
+ *
+ * @param message the message
+ * @return true iff the message is a partial response to a oneway request
+ */
+ public static boolean isPartialResponse(Message message) {
+ return RMContextUtils.isOutbound(message)
+ && message.getContent(List.class) == null
+ && getException(message.getExchange()) == null;
+ }
+
+ /**
+ * Checks if the action String belongs to an application message.
+ *
+ * @param action the action
+ * @return true iff the action is not one of the RM protocol actions.
+ */
+ public static boolean isAplicationMessage(String action) {
+ if (RMConstants.getCreateSequenceAction().equals(action)
+ || RMConstants.getCreateSequenceResponseAction().equals(action)
+ || RMConstants.getTerminateSequenceAction().equals(action)
+ || RMConstants.getLastMessageAction().equals(action)
+ || RMConstants.getSequenceAcknowledgmentAction().equals(action)
+ || RMConstants.getSequenceInfoAction().equals(action)) {
+ return false;
+ }
+ return true;
+ }
+
/**
* Retrieve the RM properties from the current message.
+ *
* @param message the current message
* @param outbound true iff the message direction is outbound
* @return the RM properties
@@ -106,11 +138,12 @@
}
}
return null;
-
+
}
-
+
/**
* Store the RM properties in the current message.
+ *
* @param message the current message
* @param rmps the RM properties
* @param outbound iff the message direction is outbound
@@ -119,7 +152,7 @@
String key = getRMPropertiesKey(outbound);
message.put(key, rmps);
}
-
+
/**
* Retrieves the addressing properties from the current message.
*
@@ -131,22 +164,22 @@
* @return the current addressing properties
*/
public static AddressingPropertiesImpl retrieveMAPs(Message message, boolean isProviderContext,
- boolean isOutbound) {
+ boolean isOutbound) {
return org.apache.cxf.ws.addressing.ContextUtils.retrieveMAPs(message, isProviderContext, isOutbound);
}
-
+
/**
* Store MAPs in the message.
- *
+ *
* @param maps the MAPs to store
* @param message the current message
* @param isOutbound true iff the message is outbound
* @param isRequestor true iff the current messaging role is that of
- * requestor
+ * requestor
* @param handler true if HANDLER scope, APPLICATION scope otherwise
*/
public static void storeMAPs(AddressingProperties maps, Message message, boolean isProviderContext,
- boolean isOutbound) {
+ boolean isOutbound) {
org.apache.cxf.ws.addressing.ContextUtils.storeMAPs(maps, message, isProviderContext, isOutbound);
}
@@ -167,12 +200,21 @@
* @param message the current Message
* @return the endpoint
*/
- public static Endpoint getEndpoint(Message message) {
+ public static Endpoint getEndpoint(Message message) {
return message.getExchange().get(Endpoint.class);
}
-
+
private static String getRMPropertiesKey(boolean outbound) {
- return outbound ? RMMessageConstants.RM_PROPERTIES_OUTBOUND
- : RMMessageConstants.RM_PROPERTIES_INBOUND;
+ return outbound
+ ? RMMessageConstants.RM_PROPERTIES_OUTBOUND : RMMessageConstants.RM_PROPERTIES_INBOUND;
+ }
+
+ private static Exception getException(Exchange exchange) {
+ if (exchange.getOutFaultMessage() != null) {
+ return exchange.getOutFaultMessage().getContent(Exception.class);
+ } else if (exchange.getInFaultMessage() != null) {
+ return exchange.getInFaultMessage().getContent(Exception.class);
+ }
+ return null;
}
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Tue Nov 21 06:50:53 2006
@@ -72,7 +72,7 @@
bus.setExtension(this, RMManager.class);
}
if (null == retransmissionQueue) {
- retransmissionQueue = new RetransmissionQueueImpl();
+ retransmissionQueue = new RetransmissionQueueImpl(this);
}
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java Tue Nov 21 06:50:53 2006
@@ -39,6 +39,9 @@
public static final String ORIGINAL_REQUESTOR_ROLE =
"org.apache.cxf.client.original";
+ public static final String SAVED_OUTPUT_STREAM =
+ "org.apache.cxf.ws.rm.outputstream";
+
/**
* Prevents instantiation.
*/
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Tue Nov 21 06:50:53 2006
@@ -73,7 +73,24 @@
LOG.fine("Action: " + action);
}
- boolean isApplicationMessage = isAplicationMessage(action);
+ boolean isApplicationMessage = RMContextUtils.isAplicationMessage(action);
+ boolean isPartialResponse = RMContextUtils.isPartialResponse(message);
+ LOG.fine("isApplicationMessage: " + isApplicationMessage);
+ LOG.fine("isPartialResponse: " + isPartialResponse);
+
+ if (isApplicationMessage && !isPartialResponse) {
+ RetransmissionInterceptor ri = new RetransmissionInterceptor();
+ ri.setManager(getManager());
+ // TODO:
+ // On the server side: If a fault occurs after this interceptor we will switch
+ // interceptor chains (if this is not already a fault message) and therefore need to
+ // make sure the retransmission interceptor is added to the fault chain
+ //
+ message.getInterceptorChain().add(ri);
+ LOG.fine("Added RetransmissionInterceptor to chain.");
+
+ getManager().getRetransmissionQueue().start();
+ }
RMProperties rmpsOut = (RMProperties)RMContextUtils.retrieveRMProperties(message, true);
if (null == rmpsOut) {
@@ -85,7 +102,7 @@
Identifier inSeqId = null;
BigInteger inMessageNumber = null;
- if (isApplicationMessage && !isPartialResponse(message)) {
+ if (isApplicationMessage && !isPartialResponse) {
rmpsIn = (RMProperties)RMContextUtils.retrieveRMProperties(message, false);
Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java?view=auto&rev=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java Tue Nov 21 06:50:53 2006
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.io.CachedOutputStreamCallback;
+import org.apache.cxf.message.Message;
+
+/**
+ *
+ */
+public class RetransmissionCallback implements CachedOutputStreamCallback {
+
+ Message message;
+ RMManager manager;
+
+ RetransmissionCallback(Message m, RMManager mgr) {
+ message = m;
+ manager = mgr;
+ }
+ public void onClose(AbstractCachedOutputStream cos) {
+ // no-op
+ }
+
+ public void onFlush(AbstractCachedOutputStream cos) {
+ OutputStream os = cos.getOut();
+ if (os instanceof ByteArrayOutputStream) {
+ ByteArrayOutputStream bos = (ByteArrayOutputStream)os;
+ message.put(RMMessageConstants.SAVED_OUTPUT_STREAM, bos);
+ manager.getRetransmissionQueue().addUnacknowledged(message);
+ }
+ }
+}
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java?view=auto&rev=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java Tue Nov 21 06:50:53 2006
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.io.OutputStream;
+
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+
+/**
+ *
+ */
+public class RetransmissionInterceptor extends AbstractPhaseInterceptor {
+
+ RMManager manager;
+
+ public RMManager getManager() {
+ return manager;
+ }
+
+ public void setManager(RMManager manager) {
+ this.manager = manager;
+ }
+
+ @Override
+ public String getPhase() {
+ return Phase.PRE_PROTOCOL;
+ }
+
+ public void handleMessage(Message message) throws Fault {
+ handle(message, false);
+ }
+
+ @Override
+ public void handleFault(Message message) {
+ handle(message, true);
+ }
+
+ public String getId() {
+ return RetransmissionInterceptor.class.getName();
+ }
+
+ void handle(Message message, boolean isFault) {
+
+ if (null == getManager().getRetransmissionQueue()) {
+ return;
+ }
+
+ OutputStream os = message.getContent(OutputStream.class);
+ if (null == os) {
+ return;
+ }
+
+ if (os instanceof AbstractCachedOutputStream) {
+ ((AbstractCachedOutputStream)os).registerCallback(
+ new RetransmissionCallback(message, getManager()));
+ }
+ }
+}
+
+
+
+
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Tue Nov 21 06:50:53 2006
@@ -19,30 +19,98 @@
package org.apache.cxf.ws.rm.soap;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigInteger;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.io.CachedOutputStreamCallback;
import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.RMContextUtils;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RMMessageConstants;
+import org.apache.cxf.ws.rm.RMProperties;
+import org.apache.cxf.ws.rm.RetransmissionCallback;
import org.apache.cxf.ws.rm.RetransmissionQueue;
+import org.apache.cxf.ws.rm.SequenceType;
import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.persistence.RMStore;
/**
*
*/
public class RetransmissionQueueImpl implements RetransmissionQueue {
+ private static final Logger LOG = LogUtils.getL7dLogger(RetransmissionQueueImpl.class);
+
+ private long baseRetransmissionInterval = 3000L;
+ private int exponentialBackoff = 2;
+ private Map<String, List<ResendCandidate>> candidates = new HashMap<String, List<ResendCandidate>>();
+ private Resender resender;
+ private Runnable resendInitiator;
+ private Timer timer;
+ private RMManager manager;
+
+ public RetransmissionQueueImpl(RMManager m) {
+ manager = m;
+ }
+
+ public RMManager getManager() {
+ return manager;
+ }
+
+ public void setManager(RMManager m) {
+ manager = m;
+ }
+
+ public long getBaseRetransmissionInterval() {
+ return baseRetransmissionInterval;
+ }
+
+ public void setBaseRetransmissionInterval(long baseRetransmissionInterval) {
+ this.baseRetransmissionInterval = baseRetransmissionInterval;
+ }
+
+ public void setExponentialBackoff(int exponentialBackoff) {
+ this.exponentialBackoff = exponentialBackoff;
+ }
+
public void addUnacknowledged(Message message) {
- // TODO Auto-generated method stub
-
+ cacheUnacknowledged(message);
}
- public int countUnacknowledged(SourceSequence seq) {
- // TODO Auto-generated method stub
- return 0;
+ /**
+ * @param seq the sequence under consideration
+ * @return the number of unacknowledged messages for that sequence
+ */
+ public synchronized int countUnacknowledged(SourceSequence seq) {
+ List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+ return sequenceCandidates == null ? 0 : sequenceCandidates.size();
}
- public boolean isEmpty() {
- // TODO Auto-generated method stub
- return false;
+ /**
+ * @return true if there are no unacknowledged messages in the queue
+ */
+ public boolean isEmpty() {
+ return 0 == getUnacknowledged().size();
}
public void populate(Collection<SourceSequence> sss) {
@@ -50,19 +118,373 @@
}
+ /**
+ * Purge all candidates for the given sequence that have been acknowledged.
+ *
+ * @param seq the sequence object.
+ */
public void purgeAcknowledged(SourceSequence seq) {
- // TODO Auto-generated method stub
-
+ Collection<BigInteger> purged = new ArrayList<BigInteger>();
+ synchronized (this) {
+ List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+ if (null != sequenceCandidates) {
+ for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
+ ResendCandidate candidate = sequenceCandidates.get(i);
+ RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
+ true);
+ SequenceType st = properties.getSequence();
+ BigInteger m = st.getMessageNumber();
+ if (seq.isAcknowledged(m)) {
+ sequenceCandidates.remove(i);
+ candidate.resolved();
+ purged.add(m);
+ }
+ }
+ }
+ }
+ if (purged.size() > 0) {
+ RMStore store = manager.getStore();
+ if (null != store) {
+ store.removeMessages(seq.getIdentifier(), purged, true);
+ }
+ }
}
+ /**
+ * Initiate resends.
+ *
+ * @param queue the work queue providing async execution
+ */
public void start() {
- // TODO Auto-generated method stub
+ if (null != timer) {
+ return;
+ }
+ LOG.fine("Starting retransmission queue");
+
+ // setup resender
+ if (null == resender) {
+ resender = getDefaultResender();
+ }
+ // start resend initiator
+ TimerTask task = new TimerTask() {
+ public void run() {
+ getResendInitiator().run();
+ }
+ };
+ timer = new Timer();
+ // TODO
+ // delay starting the queue to give the first request a chance to be sent before
+ // waiting for another period.
+ timer.schedule(task, getBaseRetransmissionInterval() / 2, getBaseRetransmissionInterval());
}
+ /**
+ * Stops retransmission queue.
+ */
public void stop() {
- // TODO Auto-generated method stub
-
+ if (null != timer) {
+ LOG.fine("Stopping retransmission queue");
+ timer.cancel();
+ timer = null;
+ }
+ }
+
+ /**
+ * @return the exponential backoff
+ */
+ protected int getExponentialBackoff() {
+ return exponentialBackoff;
+ }
+
+ /**
+ * @return the ResendInitiator
+ */
+ protected Runnable getResendInitiator() {
+ if (resendInitiator == null) {
+ resendInitiator = new ResendInitiator();
+ }
+ return resendInitiator;
+ }
+
+ /**
+ * @param message the message context
+ * @return a ResendCandidate
+ */
+ protected ResendCandidate createResendCandidate(Message message) {
+ return new ResendCandidate(message);
+ }
+
+ /**
+ * Accepts a new resend candidate.
+ *
+ * @param ctx the message context.
+ * @return ResendCandidate
+ */
+ protected ResendCandidate cacheUnacknowledged(Message message) {
+ ResendCandidate candidate = null;
+ RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);
+ SequenceType st = rmps.getSequence();
+ Identifier sid = st.getIdentifier();
+ synchronized (this) {
+ String key = sid.getValue();
+ List<ResendCandidate> sequenceCandidates = getSequenceCandidates(key);
+ if (null == sequenceCandidates) {
+ sequenceCandidates = new ArrayList<ResendCandidate>();
+ candidates.put(key, sequenceCandidates);
+ }
+ candidate = new ResendCandidate(message);
+ sequenceCandidates.add(candidate);
+ }
+ LOG.fine("Cached unacknowledged message.");
+ return candidate;
+ }
+
+ /**
+ * @return a map relating sequence ID to a lists of un-acknowledged messages
+ * for that sequence
+ */
+ protected Map<String, List<ResendCandidate>> getUnacknowledged() {
+ return candidates;
+ }
+
+ /**
+ * @param seq the sequence under consideration
+ * @return the list of resend candidates for that sequence
+ * @pre called with mutex held
+ */
+ protected List<ResendCandidate> getSequenceCandidates(SourceSequence seq) {
+ return getSequenceCandidates(seq.getIdentifier().getValue());
+ }
+
+ /**
+ * @param key the sequence identifier under consideration
+ * @return the list of resend candidates for that sequence
+ * @pre called with mutex held
+ */
+ protected List<ResendCandidate> getSequenceCandidates(String key) {
+ return candidates.get(key);
+ }
+
+ private void clientResend(Message message) {
+ Conduit c = message.getExchange().getConduit();
+ try {
+
+ // get registered callbacks, create new output stream and re-register
+ // all callbacks except the retransmission callback
+
+ OutputStream os = message.getContent(OutputStream.class);
+ List<CachedOutputStreamCallback> callbacks = null;
+ if (os instanceof AbstractCachedOutputStream) {
+ callbacks = ((AbstractCachedOutputStream)os).getCallbacks();
+ }
+
+ c.send(message);
+
+ os = message.getContent(OutputStream.class);
+ if (os instanceof AbstractCachedOutputStream && callbacks.size() > 1) {
+ for (CachedOutputStreamCallback cb : callbacks) {
+ if (!(cb instanceof RetransmissionCallback)) {
+ ((AbstractCachedOutputStream)os).registerCallback(cb);
+ }
+ }
+ }
+ ByteArrayOutputStream savedOutputStream =
+ (ByteArrayOutputStream)message.get(RMMessageConstants.SAVED_OUTPUT_STREAM);
+ ByteArrayInputStream bis = new ByteArrayInputStream(savedOutputStream.toByteArray());
+
+ // copy saved output stream to new output stream in chunks of 1024
+ AbstractCachedOutputStream.copyStream(bis, os, 1024);
+ os.flush();
+ os.close();
+ } catch (IOException ex) {
+ LOG.log(Level.SEVERE, "RESEND_FAILED_MSG", ex);
+ }
+ }
+
+ private void serverResend(Message message) {
+ // TODO
+ }
+
+ /**
+ * Manages scheduling of resend attempts. A single task runs every base
+ * transmission interval, determining which resend candidates are due a
+ * resend attempt.
+ */
+ protected class ResendInitiator implements Runnable {
+ public void run() {
+ // iterate over resend candidates, resending any that are due
+ synchronized (RetransmissionQueueImpl.this) {
+ Iterator<Map.Entry<String, List<ResendCandidate>>> sequences = candidates.entrySet()
+ .iterator();
+ while (sequences.hasNext()) {
+ Iterator<ResendCandidate> sequenceCandidates = sequences.next().getValue().iterator();
+ boolean requestAck = true;
+ while (sequenceCandidates.hasNext()) {
+ ResendCandidate candidate = sequenceCandidates.next();
+ if (candidate.isDue()) {
+ candidate.initiate(requestAck);
+ requestAck = false;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Represents a candidate for resend, i.e. an unacked outgoing message. When
+ * this is determined as due another resend attempt, an asynchronous task is
+ * scheduled for this purpose.
+ */
+ protected class ResendCandidate implements Runnable {
+ private Message message;
+ private int skips;
+ private int skipped;
+ private boolean pending;
+ private boolean includeAckRequested;
+
+ /**
+ * @param ctx message context for the unacked message
+ */
+ protected ResendCandidate(Message m) {
+ message = m;
+ skipped = -1;
+ skips = 1;
+ }
+
+ /**
+ * Async resend logic.
+ */
+ public void run() {
+ try {
+ // ensure ACK wasn't received while this task was enqueued
+ // on executor
+ if (isPending()) {
+ resender.resend(message, includeAckRequested);
+ includeAckRequested = false;
+ }
+ } finally {
+ attempted();
+ }
+ }
+
+ /**
+ * @return true if candidate is due a resend REVISIT should bound the
+ * max number of resend attampts
+ */
+ protected synchronized boolean isDue() {
+ boolean due = false;
+ // skip count is used to model exponential backoff
+ // to avoid gratuitous time evaluation
+ if (!pending && ++skipped == skips) {
+ skips *= getExponentialBackoff();
+ skipped = 0;
+ due = true;
+ }
+ return due;
+ }
+
+ /**
+ * @return if resend attempt is pending
+ */
+ protected synchronized boolean isPending() {
+ return pending;
+ }
+
+ /**
+ * Initiate resend asynchronsly.
+ *
+ * @param requestAcknowledge true if a AckRequest header is to be sent
+ * with resend
+ */
+ protected synchronized void initiate(boolean requestAcknowledge) {
+ includeAckRequested = requestAcknowledge;
+ pending = true;
+ Endpoint ep = message.getExchange().get(Endpoint.class);
+ Executor executor = ep.getExecutor();
+ if (null == executor) {
+ executor = ep.getService().getExecutor();
+ }
+ try {
+ executor.execute(this);
+ } catch (RejectedExecutionException ex) {
+ LOG.log(Level.SEVERE, "RESEND_INITIATION_FAILED_MSG", ex);
+ }
+ }
+
+ /**
+ * ACK has been received for this candidate.
+ */
+ protected synchronized void resolved() {
+ pending = false;
+ skips = Integer.MAX_VALUE;
+ }
+
+ /**
+ * @return associated message context
+ */
+ protected Message getMessage() {
+ return message;
+ }
+
+ /**
+ * A resend has been attempted.
+ */
+ private synchronized void attempted() {
+ pending = false;
+ }
+ }
+
+ /**
+ * Encapsulates actual resend logic (pluggable to facilitate unit testing)
+ */
+ public interface Resender {
+ /**
+ * Resend mechanics.
+ *
+ * @param context the cloned message context.
+ * @param if a AckRequest should be included
+ */
+ void resend(Message message, boolean requestAcknowledge);
+ }
+
+ /**
+ * Create default Resender logic.
+ *
+ * @return default Resender
+ */
+ protected final Resender getDefaultResender() {
+ return new Resender() {
+ public void resend(Message message, boolean requestAcknowledge) {
+ RMProperties properties = RMContextUtils.retrieveRMProperties(message, true);
+ SequenceType st = properties.getSequence();
+ if (st != null) {
+ LOG.log(Level.INFO, "RESEND_MSG", st.getMessageNumber());
+ }
+ try {
+ // TODO: remove previously added acknowledgments and update
+ // message id (to avoid duplicates)
+
+ if (RMContextUtils.isRequestor(message)) {
+ clientResend(message);
+ } else {
+ serverResend(message);
+ }
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "RESEND_FAILED_MSG", e);
+ }
+ }
+ };
+ };
+
+ /**
+ * Plug in replacement resend logic (facilitates unit testing).
+ *
+ * @param replacement resend logic
+ */
+ protected void replaceResender(Resender replacement) {
+ resender = replacement;
}
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java Tue Nov 21 06:50:53 2006
@@ -23,9 +23,12 @@
import java.math.BigInteger;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import junit.framework.TestCase;
+import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.Phase;
import org.apache.cxf.phase.PhaseInterceptorChain;
@@ -70,17 +73,28 @@
};
RMOutInterceptor interceptor = control.createMock(RMOutInterceptor.class, mocked);
RMManager manager = control.createMock(RMManager.class);
- EasyMock.expect(interceptor.getManager()).andReturn(manager).times(3);
+ EasyMock.expect(interceptor.getManager()).andReturn(manager).times(5);
Message message = control.createMock(Message.class);
-
+ Exchange ex = control.createMock(Exchange.class);
+ EasyMock.expect(message.getExchange()).andReturn(ex).times(2);
+ EasyMock.expect(ex.getOutMessage()).andReturn(message);
+ EasyMock.expect(message.getContent(List.class)).andReturn(Collections.singletonList("CXF"));
EasyMock.expect(message.get(Message.REQUESTOR_ROLE)).andReturn(Boolean.TRUE).anyTimes();
EasyMock.expect(message.get(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES_OUTBOUND))
.andReturn(maps).anyTimes();
RMProperties rmpsOut = new RMProperties();
EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_OUTBOUND)).andReturn(rmpsOut);
EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).andReturn(null);
-
+ InterceptorChain chain = control.createMock(InterceptorChain.class);
+ EasyMock.expect(message.getInterceptorChain()).andReturn(chain);
+ chain.add(EasyMock.isA(RetransmissionInterceptor.class));
+ EasyMock.expectLastCall();
+ RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
+ EasyMock.expect(manager.getRetransmissionQueue()).andReturn(queue);
+ queue.start();
+ EasyMock.expectLastCall();
+
Source source = control.createMock(Source.class);
EasyMock.expect(manager.getSource(message)).andReturn(source);
Destination destination = control.createMock(Destination.class);
Copied: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java (from r477271, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/soap/RetransmissionQueueTest.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?view=diff&rev=477690&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/soap/RetransmissionQueueTest.java&r1=477271&p2=incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java&r2=477690
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/soap/RetransmissionQueueTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java Tue Nov 21 06:50:53 2006
@@ -1,65 +1,61 @@
-package org.objectweb.celtix.ws.rm.soap;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.soap;
+
+import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
-
-import javax.xml.soap.Name;
-import javax.xml.soap.SOAPEnvelope;
-import javax.xml.soap.SOAPHeader;
-import javax.xml.soap.SOAPHeaderElement;
-import javax.xml.soap.SOAPMessage;
-import javax.xml.soap.SOAPPart;
-import javax.xml.ws.handler.MessageContext;
+import java.util.concurrent.Executor;
import junit.framework.TestCase;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RMMessageConstants;
+import org.apache.cxf.ws.rm.RMProperties;
+import org.apache.cxf.ws.rm.SequenceType;
+import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.persistence.RMStore;
import org.easymock.IMocksControl;
import org.easymock.classextension.EasyMock;
-import org.objectweb.celtix.bindings.AbstractBindingBase;
-import org.objectweb.celtix.bindings.AbstractBindingImpl;
-import org.objectweb.celtix.bindings.DataBindingCallback;
-import org.objectweb.celtix.bindings.ServerDataBindingCallback;
-import org.objectweb.celtix.context.InputStreamMessageContext;
-import org.objectweb.celtix.context.ObjectMessageContext;
-import org.objectweb.celtix.context.OutputStreamMessageContext;
-import org.objectweb.celtix.handlers.HandlerInvoker;
-import org.objectweb.celtix.transports.ClientTransport;
-import org.objectweb.celtix.transports.ServerTransport;
-import org.objectweb.celtix.transports.Transport;
-import org.objectweb.celtix.workqueue.WorkQueue;
-import org.objectweb.celtix.ws.addressing.AddressingProperties;
-import org.objectweb.celtix.ws.addressing.AddressingPropertiesImpl;
-import org.objectweb.celtix.ws.addressing.soap.MAPCodec;
-import org.objectweb.celtix.ws.rm.Identifier;
-import org.objectweb.celtix.ws.rm.Names;
-import org.objectweb.celtix.ws.rm.RMProperties;
-import org.objectweb.celtix.ws.rm.SequenceType;
-import org.objectweb.celtix.ws.rm.SourceSequence;
-import org.objectweb.celtix.ws.rm.persistence.RMMessage;
-import org.objectweb.celtix.ws.rm.persistence.RMStore;
-
-import static org.objectweb.celtix.bindings.JAXWSConstants.DATABINDING_CALLBACK_PROPERTY;
-import static org.objectweb.celtix.context.ObjectMessageContext.REQUESTOR_ROLE_PROPERTY;
-import static org.objectweb.celtix.ws.addressing.JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES_OUTBOUND;
-import static org.objectweb.celtix.ws.rm.JAXWSRMConstants.RM_PROPERTIES_OUTBOUND;
/**
* Test resend logic.
*/
-public class RetransmissionQueueTest extends TestCase {
+public class RetransmissionQueueImplTest extends TestCase {
private IMocksControl control;
- private PersistenceHandler handler;
- private WorkQueue workQueue;
+ private RMManager manager;
+ private Executor executor;
private RetransmissionQueueImpl queue;
private TestResender resender;
- private List<ObjectMessageContext> contexts =
- new ArrayList<ObjectMessageContext>();
+ private List<Message> messages =
+ new ArrayList<Message>();
private List<RMProperties> properties =
new ArrayList<RMProperties>();
private List<SequenceType> sequences =
@@ -71,16 +67,17 @@
public void setUp() {
control = EasyMock.createNiceControl();
- handler = createMock(PersistenceHandler.class);
- queue = new RetransmissionQueueImpl(handler);
+ manager = createMock(RMManager.class);
+ queue = new RetransmissionQueueImpl(manager);
resender = new TestResender();
queue.replaceResender(resender);
- workQueue = createMock(WorkQueue.class);
+ executor = createMock(Executor.class);
+
}
public void tearDown() {
control.verify();
- contexts.clear();
+ messages.clear();
properties.clear();
sequences.clear();
mocks.clear();
@@ -102,18 +99,14 @@
}
public void testCacheUnacknowledged() {
- ObjectMessageContext context1 = setUpContext("sequence1");
- ObjectMessageContext context2 = setUpContext("sequence2");
- ObjectMessageContext context3 = setUpContext("sequence1");
-
- setupContextMAPs(context1);
- setupContextMAPs(context2);
- setupContextMAPs(context3);
+ Message message1 = setUpMessage("sequence1");
+ Message message2 = setUpMessage("sequence2");
+ Message message3 = setUpMessage("sequence1");
ready();
assertNotNull("expected resend candidate",
- queue.cacheUnacknowledged(context1));
+ queue.cacheUnacknowledged(message1));
assertEquals("expected non-empty unacked map",
1,
queue.getUnacknowledged().size());
@@ -121,11 +114,11 @@
queue.getUnacknowledged().get("sequence1");
assertNotNull("expected non-null context list", sequence1List);
assertSame("expected context list entry",
- context1,
- sequence1List.get(0).getContext());
+ message1,
+ sequence1List.get(0).getMessage());
assertNotNull("expected resend candidate",
- queue.cacheUnacknowledged(context2));
+ queue.cacheUnacknowledged(message2));
assertEquals("unexpected unacked map size",
2,
queue.getUnacknowledged().size());
@@ -133,11 +126,11 @@
queue.getUnacknowledged().get("sequence2");
assertNotNull("expected non-null context list", sequence2List);
assertSame("expected context list entry",
- context2,
- sequence2List.get(0).getContext());
+ message2,
+ sequence2List.get(0).getMessage());
assertNotNull("expected resend candidate",
- queue.cacheUnacknowledged(context3));
+ queue.cacheUnacknowledged(message3));
assertEquals("un expected unacked map size",
2,
queue.getUnacknowledged().size());
@@ -145,8 +138,8 @@
queue.getUnacknowledged().get("sequence1");
assertNotNull("expected non-null context list", sequence1List);
assertSame("expected context list entry",
- context3,
- sequence1List.get(1).getContext());
+ message3,
+ sequence1List.get(1).getMessage());
}
public void testPurgeAcknowledgedSome() {
@@ -157,12 +150,12 @@
List<RetransmissionQueueImpl.ResendCandidate> sequenceList =
new ArrayList<RetransmissionQueueImpl.ResendCandidate>();
queue.getUnacknowledged().put("sequence1", sequenceList);
- ObjectMessageContext context1 =
- setUpContext("sequence1", messageNumbers[0]);
- sequenceList.add(queue.createResendCandidate(context1));
- ObjectMessageContext context2 =
- setUpContext("sequence1", messageNumbers[1]);
- sequenceList.add(queue.createResendCandidate(context2));
+ Message message1 =
+ setUpMessage("sequence1", messageNumbers[0]);
+ sequenceList.add(queue.createResendCandidate(message1));
+ Message message2 =
+ setUpMessage("sequence1", messageNumbers[1]);
+ sequenceList.add(queue.createResendCandidate(message2));
ready();
queue.purgeAcknowledged(sequence);
@@ -182,12 +175,12 @@
List<RetransmissionQueueImpl.ResendCandidate> sequenceList =
new ArrayList<RetransmissionQueueImpl.ResendCandidate>();
queue.getUnacknowledged().put("sequence1", sequenceList);
- ObjectMessageContext context1 =
- setUpContext("sequence1", messageNumbers[0]);
- sequenceList.add(queue.createResendCandidate(context1));
- ObjectMessageContext context2 =
- setUpContext("sequence1", messageNumbers[1]);
- sequenceList.add(queue.createResendCandidate(context2));
+ Message message1 =
+ setUpMessage("sequence1", messageNumbers[0]);
+ sequenceList.add(queue.createResendCandidate(message1));
+ Message message2 =
+ setUpMessage("sequence1", messageNumbers[1]);
+ sequenceList.add(queue.createResendCandidate(message2));
ready();
queue.purgeAcknowledged(sequence);
@@ -213,12 +206,12 @@
new ArrayList<RetransmissionQueueImpl.ResendCandidate>();
queue.getUnacknowledged().put("sequence1", sequenceList);
- ObjectMessageContext context1 =
- setUpContext("sequence1", messageNumbers[0], false);
- sequenceList.add(queue.createResendCandidate(context1));
- ObjectMessageContext context2 =
- setUpContext("sequence1", messageNumbers[1], false);
- sequenceList.add(queue.createResendCandidate(context2));
+ Message message1 =
+ setUpMessage("sequence1", messageNumbers[0], false);
+ sequenceList.add(queue.createResendCandidate(message1));
+ Message message2 =
+ setUpMessage("sequence1", messageNumbers[1], false);
+ sequenceList.add(queue.createResendCandidate(message2));
ready();
assertEquals("unexpected unacked count",
@@ -239,8 +232,9 @@
queue.countUnacknowledged(sequence));
}
- public void testPopulate() {
+ public void xtestPopulate() {
+ /*
Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
Collection<RMMessage> msgs = new ArrayList<RMMessage>();
// List<Handler> handlerChain = new ArrayList<Handler>();
@@ -260,22 +254,10 @@
MessageContext context = control.createMock(MessageContext.class);
msg.getContext();
EasyMock.expectLastCall().andReturn(context);
- /*
- AbstractBindingBase binding = control.createMock(AbstractBindingBase.class);
- handler.getBinding();
- EasyMock.expectLastCall().andReturn(binding).times(2);
- AbstractBindingImpl abi = control.createMock(AbstractBindingImpl.class);
- binding.getBindingImpl();
- EasyMock.expectLastCall().andReturn(abi).times(2);
- */
+
RMSoapHandler rmh = control.createMock(RMSoapHandler.class);
MAPCodec wsah = control.createMock(MAPCodec.class);
- /*
- handlerChain.add(rmh);
- handlerChain.add(wsah);
- abi.getPostProtocolSystemHandlers();
- EasyMock.expectLastCall().andReturn(handlerChain).times(2);
- */
+
handler.getWsaSOAPHandler();
EasyMock.expectLastCall().andReturn(wsah);
handler.getRMSoapHandler();
@@ -297,23 +279,22 @@
queue.populate(sss);
- assertTrue("queue is empty", !queue.isEmpty());
+ assertTrue("queue is empty", !queue.isEmpty());
+ */
}
public void testResendInitiatorBackoffLogic() {
- ObjectMessageContext context1 = setUpContext("sequence1");
- ObjectMessageContext context2 = setUpContext("sequence2");
- ObjectMessageContext context3 = setUpContext("sequence1");
- setupContextMAPs(context1);
- setupContextMAPs(context2);
- setupContextMAPs(context3);
+ Message message1 = setUpMessage("sequence1");
+ Message message2 = setUpMessage("sequence2");
+ Message message3 = setUpMessage("sequence1");
+
ready();
RetransmissionQueueImpl.ResendCandidate candidate1 =
- queue.cacheUnacknowledged(context1);
+ queue.cacheUnacknowledged(message1);
RetransmissionQueueImpl.ResendCandidate candidate2 =
- queue.cacheUnacknowledged(context2);
+ queue.cacheUnacknowledged(message2);
RetransmissionQueueImpl.ResendCandidate candidate3 =
- queue.cacheUnacknowledged(context3);
+ queue.cacheUnacknowledged(message3);
RetransmissionQueueImpl.ResendCandidate[] allCandidates =
{candidate1, candidate2, candidate3};
boolean [] expectAckRequested = {true, true, false};
@@ -323,7 +304,7 @@
// all 3 candidates due
runInitiator(allCandidates);
- runCandidates(allCandidates, expectAckRequested);
+ runCandidates(allCandidates, expectAckRequested);
// exponential backoff => none due
runInitiator();
@@ -353,19 +334,16 @@
public void testResendInitiatorDueLogic() {
- ObjectMessageContext context1 = setUpContext("sequence1");
- ObjectMessageContext context2 = setUpContext("sequence2");
- ObjectMessageContext context3 = setUpContext("sequence1");
- setupContextMAPs(context1);
- setupContextMAPs(context2);
- setupContextMAPs(context3);
+ Message message1 = setUpMessage("sequence1");
+ Message message2 = setUpMessage("sequence2");
+ Message message3 = setUpMessage("sequence1");
ready();
RetransmissionQueueImpl.ResendCandidate candidate1 =
- queue.cacheUnacknowledged(context1);
+ queue.cacheUnacknowledged(message1);
RetransmissionQueueImpl.ResendCandidate candidate2 =
- queue.cacheUnacknowledged(context2);
+ queue.cacheUnacknowledged(message2);
RetransmissionQueueImpl.ResendCandidate candidate3 =
- queue.cacheUnacknowledged(context3);
+ queue.cacheUnacknowledged(message3);
RetransmissionQueueImpl.ResendCandidate[] allCandidates =
{candidate1, candidate2, candidate3};
boolean [] expectAckRequested = {true, true, false};
@@ -404,19 +382,16 @@
}
public void testResendInitiatorResolvedLogic() {
- ObjectMessageContext context1 = setUpContext("sequence1");
- ObjectMessageContext context2 = setUpContext("sequence2");
- ObjectMessageContext context3 = setUpContext("sequence1");
- setupContextMAPs(context1);
- setupContextMAPs(context2);
- setupContextMAPs(context3);
+ Message message1 = setUpMessage("sequence1");
+ Message message2 = setUpMessage("sequence2");
+ Message message3 = setUpMessage("sequence1");
ready();
RetransmissionQueueImpl.ResendCandidate candidate1 =
- queue.cacheUnacknowledged(context1);
+ queue.cacheUnacknowledged(message1);
RetransmissionQueueImpl.ResendCandidate candidate2 =
- queue.cacheUnacknowledged(context2);
+ queue.cacheUnacknowledged(message2);
RetransmissionQueueImpl.ResendCandidate candidate3 =
- queue.cacheUnacknowledged(context3);
+ queue.cacheUnacknowledged(message3);
RetransmissionQueueImpl.ResendCandidate[] allCandidates =
{candidate1, candidate2, candidate3};
boolean [] expectAckRequested = {true, true, false};
@@ -444,28 +419,29 @@
runInitiator();
}
- public void testResenderInitiatorNoRescheduleOnShutdown() {
+ public void xtestResenderInitiatorNoRescheduleOnShutdown() {
+ /*
ready();
queue.shutdown();
queue.getResendInitiator().run();
+ */
}
public void testDefaultResenderClient() throws Exception {
doTestDefaultResender(true);
}
- public void testDefaultResenderServer() throws Exception {
+ public void xtestDefaultResenderServer() throws Exception {
doTestDefaultResender(false);
}
private void doTestDefaultResender(boolean isRequestor) throws Exception {
- ObjectMessageContext context1 = setUpContext("sequence1");
- setupContextMAPs(context1);
+ Message message1 = setUpMessage("sequence1");
queue.replaceResender(queue.getDefaultResender());
ready();
RetransmissionQueueImpl.ResendCandidate candidate1 =
- queue.cacheUnacknowledged(context1);
+ queue.cacheUnacknowledged(message1);
RetransmissionQueueImpl.ResendCandidate[] allCandidates = {candidate1};
// initial run => none due
@@ -473,41 +449,36 @@
// single candidate due
runInitiator(allCandidates);
- setUpDefaultResender(0, isRequestor, context1);
+ setUpDefaultResender(0, isRequestor, message1);
allCandidates[0].run();
}
- private ObjectMessageContext setUpContext(String sid) {
- return setUpContext(sid, null);
+ private Message setUpMessage(String sid) {
+ return setUpMessage(sid, null);
}
- private ObjectMessageContext setUpContext(String sid,
+ private Message setUpMessage(String sid,
BigInteger messageNumber) {
- return setUpContext(sid, messageNumber, true);
+ return setUpMessage(sid, messageNumber, true);
}
- private ObjectMessageContext setUpContext(String sid,
+ private Message setUpMessage(String sid,
BigInteger messageNumber,
boolean storeSequence) {
- ObjectMessageContext context =
- createMock(ObjectMessageContext.class);
+ Message message =
+ createMock(Message.class);
if (storeSequence) {
- setUpSequenceType(context, sid, messageNumber);
+ setUpSequenceType(message, sid, messageNumber);
}
- contexts.add(context);
+ messages.add(message);
- return context;
- }
-
- private void setupContextMAPs(ObjectMessageContext context) {
- AddressingPropertiesImpl maps = createMock(AddressingPropertiesImpl.class);
- context.get(CLIENT_ADDRESSING_PROPERTIES_OUTBOUND);
- EasyMock.expectLastCall().andReturn(maps);
+ return message;
}
-
+
+ /*
private void setupContextMessage(ObjectMessageContext context) throws Exception {
SOAPMessage message = createMock(SOAPMessage.class);
- context.get("org.objectweb.celtix.bindings.soap.message");
+ context.get("org.apache.cxf.bindings.soap.message");
EasyMock.expectLastCall().andReturn(message);
SOAPPart part = createMock(SOAPPart.class);
message.getSOAPPart();
@@ -549,81 +520,60 @@
headerElements.hasNext();
EasyMock.expectLastCall().andReturn(false);
}
+ */
private void ready() {
control.replay();
- queue.start(workQueue);
+ queue.start();
}
private void setUpDefaultResender(int i,
boolean isRequestor,
- ObjectMessageContext context)
+ Message context)
throws Exception {
- assertTrue("too few contexts", i < contexts.size());
+ assertTrue("too few contexts", i < messages.size());
assertTrue("too few properties", i < properties.size());
assertTrue("too few sequences", i < sequences.size());
control.verify();
- control.reset();
+ control.reset();
- contexts.get(i).get(RM_PROPERTIES_OUTBOUND);
- EasyMock.expectLastCall().andReturn(properties.get(i)).times(2);
+ messages.get(i).get(RMMessageConstants.RM_PROPERTIES_OUTBOUND);
+ EasyMock.expectLastCall().andReturn(properties.get(i)).times(1);
properties.get(i).getSequence();
- EasyMock.expectLastCall().andReturn(sequences.get(i)).times(2);
-
- setupContextMessage(context);
+ EasyMock.expectLastCall().andReturn(sequences.get(i)).times(1);
- contexts.get(i).get(REQUESTOR_ROLE_PROPERTY);
+ messages.get(i).get(Message.REQUESTOR_ROLE);
EasyMock.expectLastCall().andReturn(Boolean.valueOf(isRequestor));
- sequences.get(i).getIdentifier();
- EasyMock.expectLastCall().andReturn(identifiers.get(i));
- Transport transport = isRequestor
- ? createMock(ClientTransport.class)
- : createMock(ServerTransport.class);
- if (isRequestor) {
- handler.getClientTransport();
- EasyMock.expectLastCall().andReturn(transport).times(2);
- } else {
- handler.getServerTransport();
- EasyMock.expectLastCall().andReturn(transport).times(1);
- }
- AbstractBindingBase binding =
- createMock(AbstractBindingBase.class);
- handler.getBinding();
- EasyMock.expectLastCall().andReturn(binding);
- HandlerInvoker handlerInvoker =
- createMock(HandlerInvoker.class);
- binding.createHandlerInvoker();
- EasyMock.expectLastCall().andReturn(handlerInvoker);
- AbstractBindingImpl bindingImpl =
- createMock(AbstractBindingImpl.class);
- binding.getBindingImpl();
- EasyMock.expectLastCall().andReturn(bindingImpl).times(isRequestor
- ? 6
- : 5);
- bindingImpl.createBindingMessageContext(contexts.get(i));
- MessageContext bindingContext =
- createMock(MessageContext.class);
- EasyMock.expectLastCall().andReturn(bindingContext);
-
- OutputStreamMessageContext outputStreamContext =
- createMock(OutputStreamMessageContext.class);
- transport.createOutputStreamContext(bindingContext);
- EasyMock.expectLastCall().andReturn(outputStreamContext);
if (isRequestor) {
-
- setUpClientDispatch(handlerInvoker,
- binding,
- outputStreamContext,
- bindingImpl,
- transport);
- } else {
- setUpServerDispatch(bindingContext, outputStreamContext);
+ Exchange ex = createMock(Exchange.class);
+ messages.get(i).getExchange();
+ EasyMock.expectLastCall().andReturn(ex);
+ Conduit conduit = createMock(Conduit.class);
+ ex.getConduit();
+ EasyMock.expectLastCall().andReturn(conduit);
+ conduit.send(messages.get(i));
+ EasyMock.expectLastCall();
+ OutputStream os = createMock(OutputStream.class);
+ messages.get(i).getContent(OutputStream.class);
+ EasyMock.expectLastCall().andReturn(os).times(2);
+ ByteArrayOutputStream saved = createMock(ByteArrayOutputStream.class);
+ messages.get(i).get(RMMessageConstants.SAVED_OUTPUT_STREAM);
+ EasyMock.expectLastCall().andReturn(saved);
+ byte[] content = "the saved message".getBytes();
+ saved.toByteArray();
+ EasyMock.expectLastCall().andReturn(content);
+ os.write(EasyMock.isA(byte[].class), EasyMock.eq(0), EasyMock.eq(content.length));
+ EasyMock.expectLastCall();
+ os.flush();
+ EasyMock.expectLastCall();
+ os.close();
+ EasyMock.expectLastCall();
}
-
control.replay();
}
+ /*
private void setUpClientDispatch(
HandlerInvoker handlerInvoker,
AbstractBindingBase binding,
@@ -653,7 +603,9 @@
bindingImpl.unmarshal(bindingContext, objectContext, null);
EasyMock.expectLastCall();
}
+ */
+ /*
private void setUpServerDispatch(
MessageContext bindingContext,
OutputStreamMessageContext outputStreamContext) {
@@ -665,6 +617,7 @@
outputStreamContext.getOutputStream();
EasyMock.expectLastCall().andReturn(outputStream);
}
+ */
private void runInitiator() {
runInitiator(null);
@@ -674,18 +627,22 @@
RetransmissionQueueImpl.ResendCandidate[] dueCandidates) {
control.verify();
control.reset();
-
+
for (int i = 0;
dueCandidates != null && i < dueCandidates.length;
i++) {
- workQueue.execute(dueCandidates[i]);
+ Exchange ex = createMock(Exchange.class);
+ dueCandidates[i].getMessage().getExchange();
+ EasyMock.expectLastCall().andReturn(ex);
+ Endpoint ep = createMock(Endpoint.class);
+ ex.get(Endpoint.class);
+ EasyMock.expectLastCall().andReturn(ep);
+ ep.getExecutor();
+ EasyMock.expectLastCall().andReturn(executor);
+ executor.execute(dueCandidates[i]);
EasyMock.expectLastCall();
}
- /*
- workQueue.schedule(queue.getResendInitiator(),
- queue.getBaseRetransmissionInterval());
- EasyMock.expectLastCall();
- */
+
control.replay();
queue.getResendInitiator().run();
}
@@ -699,23 +656,23 @@
expectAckRequested[i],
resender.includeAckRequested);
assertSame("unexpected context",
- candidates[i].getContext(),
- resender.context);
+ candidates[i].getMessage(),
+ resender.message);
resender.clear();
}
}
- private SequenceType setUpSequenceType(ObjectMessageContext context,
+ private SequenceType setUpSequenceType(Message message,
String sid,
BigInteger messageNumber) {
RMProperties rmps = createMock(RMProperties.class);
- if (context != null) {
- context.get(RM_PROPERTIES_OUTBOUND);
+ if (message != null) {
+ message.get(RMMessageConstants.RM_PROPERTIES_OUTBOUND);
EasyMock.expectLastCall().andReturn(rmps);
}
properties.add(rmps);
SequenceType sequence = createMock(SequenceType.class);
- if (context != null) {
+ if (message != null) {
rmps.getSequence();
EasyMock.expectLastCall().andReturn(sequence);
}
@@ -757,7 +714,7 @@
sequence.getIdentifier();
EasyMock.expectLastCall().andReturn(id);
RMStore store = createMock(RMStore.class);
- handler.getStore();
+ manager.getStore();
EasyMock.expectLastCall().andReturn(store);
}
return sequence;
@@ -778,16 +735,16 @@
}
private static class TestResender implements RetransmissionQueueImpl.Resender {
- ObjectMessageContext context;
+ Message message;
boolean includeAckRequested;
- public void resend(ObjectMessageContext ctx, boolean requestAcknowledge) {
- context = ctx;
+ public void resend(Message ctx, boolean requestAcknowledge) {
+ message = ctx;
includeAckRequested = requestAcknowledge;
}
void clear() {
- context = null;
+ message = null;
includeAckRequested = false;
}
};
Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java Tue Nov 21 06:50:53 2006
@@ -33,6 +33,8 @@
import org.apache.cxf.greeter_control.Control;
import org.apache.cxf.greeter_control.types.StartGreeterResponse;
import org.apache.cxf.greeter_control.types.StopGreeterResponse;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RetransmissionQueue;
@WebService(serviceName = "ControlService",
@@ -73,6 +75,11 @@
}
endpoint = null;
if (null != greeterBus) {
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ RetransmissionQueue queue = manager.getRetransmissionQueue();
+ if (null != queue) {
+ queue.stop();
+ }
greeterBus.shutdown(true);
}
return true;
Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java Tue Nov 21 06:50:53 2006
@@ -410,6 +410,13 @@
}
}
+ public void purge() {
+ inboundMessages.clear();
+ outboundMessages.clear();
+ inStreams.clear();
+ outStreams.clear();
+ }
+
public void verifyPartialResponses(int nExpected) throws Exception {
verifyPartialResponses(nExpected, null);
}
Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java Tue Nov 21 06:50:53 2006
@@ -50,7 +50,6 @@
public OutMessageRecorder() {
outbound = new ArrayList<byte[]>();
setPhase(Phase.PRE_PROTOCOL);
- // setPhase(Phase.POST_STREAM);
}
public void handleMessage(Message message) throws Fault {
Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Tue Nov 21 06:50:53 2006
@@ -35,6 +35,7 @@
import org.apache.cxf.systest.common.ClientServerTestBase;
import org.apache.cxf.ws.rm.RMConstants;
import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RetransmissionQueue;
/**
@@ -67,6 +68,7 @@
private boolean doTestOnewayDeferredAnonymousAcks = testAll;
private boolean doTestOnewayDeferredNonAnonymousAcks = testAll;
private boolean doTestOnewayAnonymousAcksSequenceLength1 = testAll;
+ private boolean doTestOnewayAnonymousAcksSupressed = testAll;
private boolean doTestTwowayNonAnonymous = testAll;
private boolean doTestTwowayNonAnonymousDeferred = testAll;
private boolean doTestTwowayNonAnonymousMaximumSequenceLength2 = testAll;
@@ -101,11 +103,17 @@
public void tearDown() {
if (null != greeter) {
- assertTrue("Failed to stop greeter.", control.stopGreeter());
+ assertTrue("Failed to stop greeter.", control.stopGreeter());
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ RetransmissionQueue queue = manager.getRetransmissionQueue();
+ if (null != queue) {
+ queue.stop();
+ }
greeterBus.shutdown(true);
greeterBus = null;
}
- if (null != control) {
+ if (null != control) {
+ assertTrue("Failed to stop greeter", control.stopGreeter());
controlBus.shutdown(true);
}
}
@@ -271,6 +279,53 @@
mf.verifyMessageNumbers(new String[] {null, null, null, null, null, null}, false);
mf.verifyLastMessage(new boolean[] {false, false, false, false, false, false}, false);
mf.verifyAcknowledgements(new boolean[] {false, true, false, false, true, false}, false);
+ }
+
+ public void testOnewayAnonymousAcksSupressed() throws Exception {
+
+ if (!doTestOnewayAnonymousAcksSupressed) {
+ return;
+ }
+ setupGreeter("org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml");
+
+ greeter.greetMeOneWay("once");
+ greeter.greetMeOneWay("twice");
+ greeter.greetMeOneWay("thrice");
+
+ // three application messages plus createSequence
+
+ awaitMessages(4, 4, 2000);
+
+ MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+
+ mf.verifyMessages(4, true);
+ String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(),
+ GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION,
+ GREETMEONEWAY_ACTION};
+ mf.verifyActions(expectedActions, true);
+ mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
+
+ // createSequenceResponse plus 3 partial responses, none of which
+ // contain an acknowledgment
+
+ mf.verifyMessages(4, false);
+ mf.verifyPartialResponses(3, new boolean[3]);
+ mf.purgePartialResponses();
+
+ expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction()};
+ mf.verifyActions(expectedActions, false);
+
+ mf.purge();
+ assertEquals(0, outRecorder.getOutboundMessages().size());
+ assertEquals(0, inRecorder.getInboundMessages().size());
+
+ // allow resends to kick in
+ // await multiple of 3 resends to avoid shutting down server
+ // in the course of retransmission - this is harmless but pollutes test output
+
+ awaitMessages(3, 0, 5000);
+
}
public void testTwowayNonAnonymous() throws Exception {
Added: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml?view=auto&rev=477690
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml (added)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml Tue Nov 21 06:50:53 2006
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:wsrm-mgmt="http://cxf.apache.org/ws/rm/manager"
+ xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+ xsi:schemaLocation="
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <import resource="rminterceptors.xml"/>
+
+ <bean id="org.apache.cxf.ws.rm.RMManager" class="org.apache.cxf.ws.rm.RMManager">
+ <property name="bus" ref="cxf"/>
+ <property name="destinationPolicy">
+ <value>
+ <wsrm-mgmt:destinationPolicy>
+ <wsrm-mgmt:acksPolicy intraMessageThreshold="0"/>
+ </wsrm-mgmt:destinationPolicy>
+ </value>
+ </property>
+
+ <property name="RMAssertion">
+ <value>
+ <wsrm-policy:RMAssertion>
+ <wsrm-policy:BaseRetransmissionInterval Milliseconds="3000"/>
+ <wsrm-policy:AcknowledgementInterval Milliseconds="99999999"/>
+ </wsrm-policy:RMAssertion>
+ </value>
+ </property>
+ </bean>
+
+</beans>
\ No newline at end of file
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml