You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2006/11/21 15:50:54 UTC

svn commit: r477690 - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/io/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/test/java/org/a...

Author: andreasmyth
Date: Tue Nov 21 06:50:53 2006
New Revision: 477690

URL: http://svn.apache.org/viewvc?view=rev&rev=477690
Log:
[JIRA CXF-138, CXF-140] RetransmissionQueue implementation (client side resends only at the moment)
Added system test testOnewayAnonymousAcksSuppressed.

Added:
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
      - copied, changed from r477271, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/soap/RetransmissionQueueTest.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml   (with props)
Modified:
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractCachedOutputStream.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java

Modified: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractCachedOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractCachedOutputStream.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractCachedOutputStream.java (original)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractCachedOutputStream.java Tue Nov 21 06:50:53 2006
@@ -34,6 +34,7 @@
 import java.io.PipedOutputStream;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.cxf.common.util.Base64Utility;
@@ -69,6 +70,16 @@
             callbacks = new ArrayList<CachedOutputStreamCallback>();
         }
         callbacks.add(cb);
+    }
+    
+    public void deregisterCallback(CachedOutputStreamCallback cb) {
+        if (null != callbacks) {
+            callbacks.remove(cb);
+        }
+    }
+
+    public List<CachedOutputStreamCallback> getCallbacks() {
+        return callbacks == null ? null : Collections.unmodifiableList(callbacks);
     }
 
     /**

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java Tue Nov 21 06:50:53 2006
@@ -19,14 +19,12 @@
 
 package org.apache.cxf.ws.rm;
 
-import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.interceptor.Fault;
-import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.Phase;
 import org.apache.cxf.phase.PhaseInterceptor;
@@ -94,31 +92,5 @@
     // rm logic
     
     abstract void handleMessage(Message msg, boolean isFault) throws SequenceFault;
-    
-    protected boolean isAplicationMessage(String action) {
-        if (RMConstants.getCreateSequenceAction().equals(action)
-            || RMConstants.getCreateSequenceResponseAction().equals(action)
-            || RMConstants.getTerminateSequenceAction().equals(action)
-            || RMConstants.getLastMessageAction().equals(action)
-            || RMConstants.getSequenceAcknowledgmentAction().equals(action)
-            || RMConstants.getSequenceInfoAction().equals(action)) {
-            return false;
-        }
-        return true;
-    }
-    
-    protected boolean isPartialResponse(Message msg) {
-        return RMContextUtils.isOutbound(msg) 
-            && msg.getContent(List.class) == null
-            && getException(msg.getExchange()) == null;   
-    }
-    
-    private Exception getException(Exchange exchange) {
-        if (exchange.getOutFaultMessage() != null) {
-            return exchange.getOutFaultMessage().getContent(Exception.class);
-        } else if (exchange.getInFaultMessage() != null) {
-            return exchange.getInFaultMessage().getContent(Exception.class);
-        }
-        return null;
-    }
+   
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java Tue Nov 21 06:50:53 2006
@@ -19,7 +19,10 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.util.List;
+
 import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
@@ -31,12 +34,12 @@
 
 public final class RMContextUtils {
 
-    /** 
+    /**
      * Prevents instantiation.
      */
     protected RMContextUtils() {
     }
-    
+
     /**
      * @return a generated UUID
      */
@@ -44,7 +47,6 @@
         return org.apache.cxf.ws.addressing.ContextUtils.generateUUID();
     }
 
-    
     /**
      * Determine if message is outbound.
      * 
@@ -54,7 +56,6 @@
     public static boolean isOutbound(Message message) {
         return org.apache.cxf.ws.addressing.ContextUtils.isOutbound(message);
     }
-    
 
     /**
      * Determine if current messaging role is that of requestor.
@@ -65,7 +66,7 @@
     public static boolean isRequestor(Message message) {
         return org.apache.cxf.ws.addressing.ContextUtils.isRequestor(message);
     }
-    
+
     /**
      * Determine if message is currently being processed on server side.
      * 
@@ -76,13 +77,44 @@
         if (isOutbound(message)) {
             return message.getExchange().getInMessage() != null;
         } else {
-            return message.getExchange().getOutMessage() == null 
-                && message.getExchange().getOutFaultMessage() == null;
+            return message.getExchange().getOutMessage() == null
+                   && message.getExchange().getOutFaultMessage() == null;
         }
     }
-    
+
+    /**
+     * Checks if the message is a partial response to a oneway request.
+     * 
+     * @param message the message
+     * @return true iff the message is a partial response to a oneway request
+     */
+    public static boolean isPartialResponse(Message message) {
+        return RMContextUtils.isOutbound(message) 
+            && message.getContent(List.class) == null
+            && getException(message.getExchange()) == null; 
+    }
+
+    /**
+     * Checks if the action String belongs to an application message.
+     * 
+     * @param action the action
+     * @return true iff the action is not one of the RM protocol actions.
+     */
+    public static boolean isAplicationMessage(String action) {
+        if (RMConstants.getCreateSequenceAction().equals(action)
+            || RMConstants.getCreateSequenceResponseAction().equals(action)
+            || RMConstants.getTerminateSequenceAction().equals(action)
+            || RMConstants.getLastMessageAction().equals(action)
+            || RMConstants.getSequenceAcknowledgmentAction().equals(action)
+            || RMConstants.getSequenceInfoAction().equals(action)) {
+            return false;
+        }
+        return true;
+    }
+
     /**
      * Retrieve the RM properties from the current message.
+     * 
      * @param message the current message
      * @param outbound true iff the message direction is outbound
      * @return the RM properties
@@ -106,11 +138,12 @@
             }
         }
         return null;
-        
+
     }
-    
+
     /**
      * Store the RM properties in the current message.
+     * 
      * @param message the current message
      * @param rmps the RM properties
      * @param outbound iff the message direction is outbound
@@ -119,7 +152,7 @@
         String key = getRMPropertiesKey(outbound);
         message.put(key, rmps);
     }
-    
+
     /**
      * Retrieves the addressing properties from the current message.
      * 
@@ -131,22 +164,22 @@
      * @return the current addressing properties
      */
     public static AddressingPropertiesImpl retrieveMAPs(Message message, boolean isProviderContext,
-                                                    boolean isOutbound) {
+                                                        boolean isOutbound) {
         return org.apache.cxf.ws.addressing.ContextUtils.retrieveMAPs(message, isProviderContext, isOutbound);
     }
-    
+
     /**
      * Store MAPs in the message.
-     *
+     * 
      * @param maps the MAPs to store
      * @param message the current message
      * @param isOutbound true iff the message is outbound
      * @param isRequestor true iff the current messaging role is that of
-     * requestor
+     *            requestor
      * @param handler true if HANDLER scope, APPLICATION scope otherwise
      */
     public static void storeMAPs(AddressingProperties maps, Message message, boolean isProviderContext,
-                                                        boolean isOutbound) {
+                                 boolean isOutbound) {
         org.apache.cxf.ws.addressing.ContextUtils.storeMAPs(maps, message, isProviderContext, isOutbound);
     }
 
@@ -167,12 +200,21 @@
      * @param message the current Message
      * @return the endpoint
      */
-    public static Endpoint getEndpoint(Message message) {        
+    public static Endpoint getEndpoint(Message message) {
         return message.getExchange().get(Endpoint.class);
     }
-    
+
     private static String getRMPropertiesKey(boolean outbound) {
-        return outbound ? RMMessageConstants.RM_PROPERTIES_OUTBOUND 
-            : RMMessageConstants.RM_PROPERTIES_INBOUND;
+        return outbound
+            ? RMMessageConstants.RM_PROPERTIES_OUTBOUND : RMMessageConstants.RM_PROPERTIES_INBOUND;
+    }
+    
+    private static Exception getException(Exchange exchange) {
+        if (exchange.getOutFaultMessage() != null) {
+            return exchange.getOutFaultMessage().getContent(Exception.class);
+        } else if (exchange.getInFaultMessage() != null) {
+            return exchange.getInFaultMessage().getContent(Exception.class);
+        }
+        return null;
     }
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Tue Nov 21 06:50:53 2006
@@ -72,7 +72,7 @@
             bus.setExtension(this, RMManager.class);
         }
         if (null == retransmissionQueue) {
-            retransmissionQueue = new RetransmissionQueueImpl();
+            retransmissionQueue = new RetransmissionQueueImpl(this);
         }
     }
 

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java Tue Nov 21 06:50:53 2006
@@ -39,6 +39,9 @@
     public static final String ORIGINAL_REQUESTOR_ROLE =
         "org.apache.cxf.client.original";
     
+    public static final String SAVED_OUTPUT_STREAM =
+        "org.apache.cxf.ws.rm.outputstream";
+    
     /**
      * Prevents instantiation. 
      */

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Tue Nov 21 06:50:53 2006
@@ -73,7 +73,24 @@
             LOG.fine("Action: " + action);
         }
 
-        boolean isApplicationMessage = isAplicationMessage(action);
+        boolean isApplicationMessage = RMContextUtils.isAplicationMessage(action);
+        boolean isPartialResponse = RMContextUtils.isPartialResponse(message);
+        LOG.fine("isApplicationMessage: " + isApplicationMessage);
+        LOG.fine("isPartialResponse: " + isPartialResponse);
+        
+        if (isApplicationMessage && !isPartialResponse) {
+            RetransmissionInterceptor ri = new RetransmissionInterceptor();
+            ri.setManager(getManager());
+            // TODO:
+            // On the server side: If a fault occurs after this interceptor we will switch 
+            // interceptor chains (if this is not already a fault message) and therefore need to 
+            // make sure the retransmission interceptor is added to the fault chain
+            // 
+            message.getInterceptorChain().add(ri);
+            LOG.fine("Added RetransmissionInterceptor to chain.");
+            
+            getManager().getRetransmissionQueue().start();
+        }
         
         RMProperties rmpsOut = (RMProperties)RMContextUtils.retrieveRMProperties(message, true);
         if (null == rmpsOut) {
@@ -85,7 +102,7 @@
         Identifier inSeqId = null;
         BigInteger inMessageNumber = null;
         
-        if (isApplicationMessage && !isPartialResponse(message)) {
+        if (isApplicationMessage && !isPartialResponse) {
                         
             rmpsIn = (RMProperties)RMContextUtils.retrieveRMProperties(message, false);
             

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java?view=auto&rev=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java Tue Nov 21 06:50:53 2006
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.io.CachedOutputStreamCallback;
+import org.apache.cxf.message.Message;
+
+/**
+ * 
+ */
+public class RetransmissionCallback implements CachedOutputStreamCallback {
+    
+    Message message;
+    RMManager manager;
+    
+    RetransmissionCallback(Message m, RMManager mgr) {
+        message = m;
+        manager = mgr;
+    }
+    public void onClose(AbstractCachedOutputStream cos) {
+        // no-op
+    }
+
+    public void onFlush(AbstractCachedOutputStream cos) {
+        OutputStream os = cos.getOut();
+        if (os instanceof ByteArrayOutputStream) {
+            ByteArrayOutputStream bos = (ByteArrayOutputStream)os;
+            message.put(RMMessageConstants.SAVED_OUTPUT_STREAM, bos);  
+            manager.getRetransmissionQueue().addUnacknowledged(message);
+        }
+    }
+}

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java?view=auto&rev=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java Tue Nov 21 06:50:53 2006
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.io.OutputStream;
+
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+
+/**
+ * 
+ */
+public class RetransmissionInterceptor extends AbstractPhaseInterceptor {
+ 
+    RMManager manager;
+      
+    public RMManager getManager() {
+        return manager;
+    }
+
+    public void setManager(RMManager manager) {
+        this.manager = manager;
+    }
+
+    @Override
+    public String getPhase() {
+        return Phase.PRE_PROTOCOL;
+    }
+
+    public void handleMessage(Message message) throws Fault {
+        handle(message, false);
+    }
+    
+    @Override
+    public void handleFault(Message message) {
+        handle(message, true);
+    }
+
+    public String getId() {
+        return RetransmissionInterceptor.class.getName();
+    }
+    
+    void handle(Message message, boolean isFault) {
+        
+        if (null == getManager().getRetransmissionQueue()) {
+            return;
+        }
+          
+        OutputStream os = message.getContent(OutputStream.class);
+        if (null == os) {
+            return;
+        }
+        
+        if (os instanceof AbstractCachedOutputStream) {
+            ((AbstractCachedOutputStream)os).registerCallback(
+                new RetransmissionCallback(message, getManager()));
+        }
+    }
+}
+    
+    
+
+   

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Tue Nov 21 06:50:53 2006
@@ -19,30 +19,98 @@
 
 package org.apache.cxf.ws.rm.soap;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.io.CachedOutputStreamCallback;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.RMContextUtils;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RMMessageConstants;
+import org.apache.cxf.ws.rm.RMProperties;
+import org.apache.cxf.ws.rm.RetransmissionCallback;
 import org.apache.cxf.ws.rm.RetransmissionQueue;
+import org.apache.cxf.ws.rm.SequenceType;
 import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.persistence.RMStore;
 
 /**
  * 
  */
 public class RetransmissionQueueImpl implements RetransmissionQueue {
 
+    private static final Logger LOG = LogUtils.getL7dLogger(RetransmissionQueueImpl.class);
+    
+    private long baseRetransmissionInterval = 3000L;
+    private int exponentialBackoff = 2;
+    private Map<String, List<ResendCandidate>> candidates = new HashMap<String, List<ResendCandidate>>();
+    private Resender resender;
+    private Runnable resendInitiator;
+    private Timer timer;
+    private RMManager manager;
+    
+    public RetransmissionQueueImpl(RMManager m) {
+        manager = m;
+    }
+      
+    public RMManager getManager() {
+        return manager;
+    }
+
+    public void setManager(RMManager m) {
+        manager = m;
+    }
+
+    public long getBaseRetransmissionInterval() {
+        return baseRetransmissionInterval;
+    }
+
+    public void setBaseRetransmissionInterval(long baseRetransmissionInterval) {
+        this.baseRetransmissionInterval = baseRetransmissionInterval;
+    }
+
+    public void setExponentialBackoff(int exponentialBackoff) {
+        this.exponentialBackoff = exponentialBackoff;
+    }
+
     public void addUnacknowledged(Message message) {
-        // TODO Auto-generated method stub
-        
+        cacheUnacknowledged(message);        
     }
 
-    public int countUnacknowledged(SourceSequence seq) {
-        // TODO Auto-generated method stub
-        return 0;
+    /**
+     * @param seq the sequence under consideration
+     * @return the number of unacknowledged messages for that sequence
+     */
+    public synchronized int countUnacknowledged(SourceSequence seq) {
+        List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+        return sequenceCandidates == null ? 0 : sequenceCandidates.size();
     }
 
-    public boolean isEmpty() {
-        // TODO Auto-generated method stub
-        return false;
+    /**
+     * @return true if there are no unacknowledged messages in the queue
+     */
+    public boolean isEmpty() {  
+        return 0 == getUnacknowledged().size();
     }
 
     public void populate(Collection<SourceSequence> sss) {
@@ -50,19 +118,373 @@
         
     }
 
+    /**
+     * Purge all candidates for the given sequence that have been acknowledged.
+     * 
+     * @param seq the sequence object.
+     */
     public void purgeAcknowledged(SourceSequence seq) {
-        // TODO Auto-generated method stub
-        
+        Collection<BigInteger> purged = new ArrayList<BigInteger>();
+        synchronized (this) {
+            List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
+            if (null != sequenceCandidates) {
+                for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
+                    ResendCandidate candidate = sequenceCandidates.get(i);
+                    RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(),
+                                                                                  true);
+                    SequenceType st = properties.getSequence();
+                    BigInteger m = st.getMessageNumber();
+                    if (seq.isAcknowledged(m)) {
+                        sequenceCandidates.remove(i);
+                        candidate.resolved();
+                        purged.add(m);
+                    }
+                }
+            }
+        }
+        if (purged.size() > 0) {
+            RMStore store = manager.getStore();
+            if (null != store) {
+                store.removeMessages(seq.getIdentifier(), purged, true);
+            }
+        }
     }
 
+    /**
+     * Initiate resends.
+     * 
+     * @param queue the work queue providing async execution
+     */
     public void start() {
-        // TODO Auto-generated method stub
+        if (null != timer) {
+            return;
+        }
+        LOG.fine("Starting retransmission queue");
+        
+        // setup resender
+        if (null == resender) {
+            resender = getDefaultResender();
+        }
         
+        // start resend initiator
+        TimerTask task = new TimerTask() {
+            public void run() {
+                getResendInitiator().run();
+            }
+        };
+        timer = new Timer();
+        // TODO
+        // delay starting the queue to give the first request a chance to be sent before 
+        // waiting for another period.
+        timer.schedule(task, getBaseRetransmissionInterval() / 2, getBaseRetransmissionInterval());  
     }
 
+    /**
+     * Stops retransmission queue.
+     */ 
     public void stop() {
-        // TODO Auto-generated method stub
-        
+        if (null != timer) {
+            LOG.fine("Stopping retransmission queue");
+            timer.cancel();
+            timer = null;
+        }  
+    }
+    
+    /**
+     * @return the exponential backoff
+     */
+    protected int getExponentialBackoff() {
+        return exponentialBackoff;
+    }
+    
+    /**
+     * @return the ResendInitiator
+     */
+    protected Runnable getResendInitiator() {
+        if (resendInitiator == null) {
+            resendInitiator = new ResendInitiator();
+        }
+        return resendInitiator;
+    }
+    
+    /**
+     * @param message the message context
+     * @return a ResendCandidate
+     */
+    protected ResendCandidate createResendCandidate(Message message) {
+        return new ResendCandidate(message);
+    }
+    
+    /**
+     * Accepts a new resend candidate.
+     * 
+     * @param ctx the message context.
+     * @return ResendCandidate
+     */
+    protected ResendCandidate cacheUnacknowledged(Message message) {
+        ResendCandidate candidate = null;
+        RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);        
+        SequenceType st = rmps.getSequence();
+        Identifier sid = st.getIdentifier();
+        synchronized (this) {
+            String key = sid.getValue();
+            List<ResendCandidate> sequenceCandidates = getSequenceCandidates(key);
+            if (null == sequenceCandidates) {
+                sequenceCandidates = new ArrayList<ResendCandidate>();
+                candidates.put(key, sequenceCandidates);
+            }
+            candidate = new ResendCandidate(message);
+            sequenceCandidates.add(candidate);
+        }
+        LOG.fine("Cached unacknowledged message.");
+        return candidate;
+    }
+    
+    /**
+     * @return a map relating sequence ID to a lists of un-acknowledged messages
+     *         for that sequence
+     */
+    protected Map<String, List<ResendCandidate>> getUnacknowledged() {
+        return candidates;
+    }
+
+    /**
+     * @param seq the sequence under consideration
+     * @return the list of resend candidates for that sequence
+     * @pre called with mutex held
+     */
+    protected List<ResendCandidate> getSequenceCandidates(SourceSequence seq) {
+        return getSequenceCandidates(seq.getIdentifier().getValue());
+    }
+    
+    /**
+     * @param key the sequence identifier under consideration
+     * @return the list of resend candidates for that sequence
+     * @pre called with mutex held
+     */
+    protected List<ResendCandidate> getSequenceCandidates(String key) {
+        return candidates.get(key);
+    }
+    
+    private void clientResend(Message message) {        
+        Conduit c = message.getExchange().getConduit();
+        try {
+            
+            // get registered callbacks, create new output stream and re-register
+            // all callbacks except the retransmission callback
+            
+            OutputStream os = message.getContent(OutputStream.class);
+            List<CachedOutputStreamCallback> callbacks = null;            
+            if (os instanceof AbstractCachedOutputStream) {
+                callbacks = ((AbstractCachedOutputStream)os).getCallbacks();
+            }
+            
+            c.send(message);
+            
+            os = message.getContent(OutputStream.class);
+            if (os instanceof AbstractCachedOutputStream && callbacks.size() > 1) {
+                for (CachedOutputStreamCallback cb : callbacks) {
+                    if (!(cb instanceof RetransmissionCallback)) {
+                        ((AbstractCachedOutputStream)os).registerCallback(cb);
+                    }
+                }
+            }
+            ByteArrayOutputStream savedOutputStream = 
+                (ByteArrayOutputStream)message.get(RMMessageConstants.SAVED_OUTPUT_STREAM);
+            ByteArrayInputStream bis = new ByteArrayInputStream(savedOutputStream.toByteArray());
+            
+            // copy saved output stream to new output stream in chunks of 1024
+            AbstractCachedOutputStream.copyStream(bis, os, 1024);
+            os.flush();
+            os.close();
+        } catch (IOException ex) {
+            LOG.log(Level.SEVERE, "RESEND_FAILED_MSG", ex); 
+        }
+    }
+    
+    private void serverResend(Message message) {
+        // TODO
+    }
+    
+    /**
+     * Manages scheduling of resend attempts. A single task runs every base
+     * transmission interval, determining which resend candidates are due a
+     * resend attempt.
+     */
+    protected class ResendInitiator implements Runnable {
+        public void run() {
+            // iterate over resend candidates, resending any that are due
+            synchronized (RetransmissionQueueImpl.this) {
+                Iterator<Map.Entry<String, List<ResendCandidate>>> sequences = candidates.entrySet()
+                    .iterator();
+                while (sequences.hasNext()) {
+                    Iterator<ResendCandidate> sequenceCandidates = sequences.next().getValue().iterator();
+                    boolean requestAck = true;
+                    while (sequenceCandidates.hasNext()) {
+                        ResendCandidate candidate = sequenceCandidates.next();
+                        if (candidate.isDue()) {
+                            candidate.initiate(requestAck);
+                            requestAck = false;
+                        }
+                    }
+                }
+            }
+        }
+    }
+    
+    /**
+     * Represents a candidate for resend, i.e. an unacked outgoing message. When
+     * this is determined as due another resend attempt, an asynchronous task is
+     * scheduled for this purpose.
+     */
+    protected class ResendCandidate implements Runnable {
+        private Message message;
+        private int skips;
+        private int skipped;
+        private boolean pending;
+        private boolean includeAckRequested;
+
+        /**
+         * @param ctx message context for the unacked message
+         */
+        protected ResendCandidate(Message m) {
+            message = m;
+            skipped = -1;
+            skips = 1;
+        }
+
+        /**
+         * Async resend logic.
+         */
+        public void run() {
+            try {
+                // ensure ACK wasn't received while this task was enqueued
+                // on executor
+                if (isPending()) {
+                    resender.resend(message, includeAckRequested);
+                    includeAckRequested = false;
+                }
+            } finally {
+                attempted();
+            }
+        }
+
+        /**
+         * @return true if candidate is due a resend REVISIT should bound the
+         *         max number of resend attampts
+         */
+        protected synchronized boolean isDue() {
+            boolean due = false;
+            // skip count is used to model exponential backoff
+            // to avoid gratuitous time evaluation
+            if (!pending && ++skipped == skips) {
+                skips *= getExponentialBackoff();
+                skipped = 0;
+                due = true;
+            }
+            return due;
+        }
+
+        /**
+         * @return if resend attempt is pending
+         */
+        protected synchronized boolean isPending() {
+            return pending;
+        }
+
+        /**
+         * Initiate resend asynchronsly.
+         * 
+         * @param requestAcknowledge true if a AckRequest header is to be sent
+         *            with resend
+         */
+        protected synchronized void initiate(boolean requestAcknowledge) {
+            includeAckRequested = requestAcknowledge;
+            pending = true;
+            Endpoint ep = message.getExchange().get(Endpoint.class);
+            Executor executor = ep.getExecutor();
+            if (null == executor) {
+                executor = ep.getService().getExecutor();
+            }
+            try {
+                executor.execute(this);
+            } catch (RejectedExecutionException ex) {
+                LOG.log(Level.SEVERE, "RESEND_INITIATION_FAILED_MSG", ex);
+            }
+        }
+
+        /**
+         * ACK has been received for this candidate.
+         */
+        protected synchronized void resolved() {
+            pending = false;
+            skips = Integer.MAX_VALUE;
+        }
+
+        /**
+         * @return associated message context
+         */
+        protected Message getMessage() {
+            return message;
+        }
+
+        /**
+         * A resend has been attempted.
+         */
+        private synchronized void attempted() {
+            pending = false;
+        }
+    }
+    
+    /**
+     * Encapsulates actual resend logic (pluggable to facilitate unit testing)
+     */
+    public interface Resender {
+        /**
+         * Resend mechanics.
+         * 
+         * @param context the cloned message context.
+         * @param if a AckRequest should be included
+         */
+        void resend(Message message, boolean requestAcknowledge);
+    }
+    
+    /**
+     * Create default Resender logic.
+     * 
+     * @return default Resender
+     */
+    protected final Resender getDefaultResender() {
+        return new Resender() {
+            public void resend(Message message, boolean requestAcknowledge) {                
+                RMProperties properties = RMContextUtils.retrieveRMProperties(message, true);
+                SequenceType st = properties.getSequence();
+                if (st != null) {
+                    LOG.log(Level.INFO, "RESEND_MSG", st.getMessageNumber());
+                }
+                try {
+                    // TODO: remove previously added acknowledgments and update
+                    // message id (to avoid duplicates)
+                    
+                    if (RMContextUtils.isRequestor(message)) {
+                        clientResend(message);
+                    } else {
+                        serverResend(message);
+                    }
+                } catch (Exception e) {
+                    LOG.log(Level.WARNING, "RESEND_FAILED_MSG", e);
+                }
+            }
+        };
+    };
+    
+    /**
+     * Plug in replacement resend logic (facilitates unit testing).
+     * 
+     * @param replacement resend logic
+     */
+    protected void replaceResender(Resender replacement) {
+        resender = replacement;
     }
 
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java Tue Nov 21 06:50:53 2006
@@ -23,9 +23,12 @@
 import java.math.BigInteger;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 
 import junit.framework.TestCase;
 
+import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.Phase;
 import org.apache.cxf.phase.PhaseInterceptorChain;
@@ -70,17 +73,28 @@
         };
         RMOutInterceptor interceptor = control.createMock(RMOutInterceptor.class, mocked); 
         RMManager manager = control.createMock(RMManager.class);
-        EasyMock.expect(interceptor.getManager()).andReturn(manager).times(3);
+        EasyMock.expect(interceptor.getManager()).andReturn(manager).times(5);
         
         Message message = control.createMock(Message.class);
-
+        Exchange ex = control.createMock(Exchange.class);
+        EasyMock.expect(message.getExchange()).andReturn(ex).times(2);
+        EasyMock.expect(ex.getOutMessage()).andReturn(message);
+        EasyMock.expect(message.getContent(List.class)).andReturn(Collections.singletonList("CXF"));        
         EasyMock.expect(message.get(Message.REQUESTOR_ROLE)).andReturn(Boolean.TRUE).anyTimes();        
         EasyMock.expect(message.get(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES_OUTBOUND))
             .andReturn(maps).anyTimes();
         RMProperties rmpsOut = new RMProperties();
         EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_OUTBOUND)).andReturn(rmpsOut);
         EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).andReturn(null);
-        
+        InterceptorChain chain = control.createMock(InterceptorChain.class);
+        EasyMock.expect(message.getInterceptorChain()).andReturn(chain);
+        chain.add(EasyMock.isA(RetransmissionInterceptor.class));
+        EasyMock.expectLastCall();
+        RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
+        EasyMock.expect(manager.getRetransmissionQueue()).andReturn(queue);
+        queue.start();
+        EasyMock.expectLastCall();
+                
         Source source = control.createMock(Source.class);
         EasyMock.expect(manager.getSource(message)).andReturn(source);
         Destination destination = control.createMock(Destination.class);

Copied: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java (from r477271, incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/soap/RetransmissionQueueTest.java)
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?view=diff&rev=477690&p1=incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/soap/RetransmissionQueueTest.java&r1=477271&p2=incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java&r2=477690
==============================================================================
--- incubator/cxf/tags/celtix/pre_apache/rt/ws/rm/src/test/java/org/objectweb/celtix/ws/rm/soap/RetransmissionQueueTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java Tue Nov 21 06:50:53 2006
@@ -1,65 +1,61 @@
-package org.objectweb.celtix.ws.rm.soap;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 
+package org.apache.cxf.ws.rm.soap;
+
+import java.io.ByteArrayOutputStream;
 import java.io.OutputStream;
 import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
-
-import javax.xml.soap.Name;
-import javax.xml.soap.SOAPEnvelope;
-import javax.xml.soap.SOAPHeader;
-import javax.xml.soap.SOAPHeaderElement;
-import javax.xml.soap.SOAPMessage;
-import javax.xml.soap.SOAPPart;
-import javax.xml.ws.handler.MessageContext;
+import java.util.concurrent.Executor;
 
 import junit.framework.TestCase;
 
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.rm.Identifier;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RMMessageConstants;
+import org.apache.cxf.ws.rm.RMProperties;
+import org.apache.cxf.ws.rm.SequenceType;
+import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.easymock.IMocksControl;
 import org.easymock.classextension.EasyMock;
-import org.objectweb.celtix.bindings.AbstractBindingBase;
-import org.objectweb.celtix.bindings.AbstractBindingImpl;
-import org.objectweb.celtix.bindings.DataBindingCallback;
-import org.objectweb.celtix.bindings.ServerDataBindingCallback;
-import org.objectweb.celtix.context.InputStreamMessageContext;
-import org.objectweb.celtix.context.ObjectMessageContext;
-import org.objectweb.celtix.context.OutputStreamMessageContext;
-import org.objectweb.celtix.handlers.HandlerInvoker;
-import org.objectweb.celtix.transports.ClientTransport;
-import org.objectweb.celtix.transports.ServerTransport;
-import org.objectweb.celtix.transports.Transport;
-import org.objectweb.celtix.workqueue.WorkQueue;
-import org.objectweb.celtix.ws.addressing.AddressingProperties;
-import org.objectweb.celtix.ws.addressing.AddressingPropertiesImpl;
-import org.objectweb.celtix.ws.addressing.soap.MAPCodec;
-import org.objectweb.celtix.ws.rm.Identifier;
-import org.objectweb.celtix.ws.rm.Names;
-import org.objectweb.celtix.ws.rm.RMProperties;
-import org.objectweb.celtix.ws.rm.SequenceType;
-import org.objectweb.celtix.ws.rm.SourceSequence;
-import org.objectweb.celtix.ws.rm.persistence.RMMessage;
-import org.objectweb.celtix.ws.rm.persistence.RMStore;
-
-import static org.objectweb.celtix.bindings.JAXWSConstants.DATABINDING_CALLBACK_PROPERTY;
-import static org.objectweb.celtix.context.ObjectMessageContext.REQUESTOR_ROLE_PROPERTY;
-import static org.objectweb.celtix.ws.addressing.JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES_OUTBOUND;
-import static org.objectweb.celtix.ws.rm.JAXWSRMConstants.RM_PROPERTIES_OUTBOUND;
 
 
 /**
  * Test resend logic.
  */
-public class RetransmissionQueueTest extends TestCase {
+public class RetransmissionQueueImplTest extends TestCase {
 
     private IMocksControl control;
-    private PersistenceHandler handler;
-    private WorkQueue workQueue;
+    private RMManager manager;
+    private Executor executor;
     private RetransmissionQueueImpl queue;
     private TestResender resender;
-    private List<ObjectMessageContext> contexts =
-        new ArrayList<ObjectMessageContext>();
+    private List<Message> messages =
+        new ArrayList<Message>();
     private List<RMProperties> properties =
         new ArrayList<RMProperties>();
     private List<SequenceType> sequences =
@@ -71,16 +67,17 @@
     
     public void setUp() {
         control = EasyMock.createNiceControl();
-        handler = createMock(PersistenceHandler.class);
-        queue = new RetransmissionQueueImpl(handler);
+        manager = createMock(RMManager.class);
+        queue = new RetransmissionQueueImpl(manager);
         resender = new TestResender();
         queue.replaceResender(resender);
-        workQueue = createMock(WorkQueue.class);
+        executor = createMock(Executor.class);
+        
     }
     
     public void tearDown() {
         control.verify();
-        contexts.clear();
+        messages.clear();
         properties.clear();
         sequences.clear();
         mocks.clear();
@@ -102,18 +99,14 @@
     }
     
     public void testCacheUnacknowledged() {
-        ObjectMessageContext context1 = setUpContext("sequence1");
-        ObjectMessageContext context2 = setUpContext("sequence2");
-        ObjectMessageContext context3 = setUpContext("sequence1");
-        
-        setupContextMAPs(context1);
-        setupContextMAPs(context2);
-        setupContextMAPs(context3);
+        Message message1 = setUpMessage("sequence1");
+        Message message2 = setUpMessage("sequence2");
+        Message message3 = setUpMessage("sequence1");
         
         ready();
         
         assertNotNull("expected resend candidate",
-                      queue.cacheUnacknowledged(context1));
+                      queue.cacheUnacknowledged(message1));
         assertEquals("expected non-empty unacked map", 
                      1,
                      queue.getUnacknowledged().size());
@@ -121,11 +114,11 @@
             queue.getUnacknowledged().get("sequence1");
         assertNotNull("expected non-null context list", sequence1List);
         assertSame("expected context list entry",
-                   context1,
-                   sequence1List.get(0).getContext());
+                   message1,
+                   sequence1List.get(0).getMessage());
 
         assertNotNull("expected resend candidate",
-                      queue.cacheUnacknowledged(context2));
+                      queue.cacheUnacknowledged(message2));
         assertEquals("unexpected unacked map size", 
                      2,
                      queue.getUnacknowledged().size());
@@ -133,11 +126,11 @@
             queue.getUnacknowledged().get("sequence2");
         assertNotNull("expected non-null context list", sequence2List);
         assertSame("expected context list entry",
-                   context2,
-                   sequence2List.get(0).getContext());
+                   message2,
+                   sequence2List.get(0).getMessage());
         
         assertNotNull("expected resend candidate",
-                      queue.cacheUnacknowledged(context3));
+                      queue.cacheUnacknowledged(message3));
         assertEquals("un expected unacked map size", 
                      2,
                      queue.getUnacknowledged().size());
@@ -145,8 +138,8 @@
             queue.getUnacknowledged().get("sequence1");
         assertNotNull("expected non-null context list", sequence1List);
         assertSame("expected context list entry",
-                   context3,
-                   sequence1List.get(1).getContext());
+                   message3,
+                   sequence1List.get(1).getMessage());
     }
     
     public void testPurgeAcknowledgedSome() {
@@ -157,12 +150,12 @@
         List<RetransmissionQueueImpl.ResendCandidate> sequenceList =
             new ArrayList<RetransmissionQueueImpl.ResendCandidate>();
         queue.getUnacknowledged().put("sequence1", sequenceList);
-        ObjectMessageContext context1 =
-            setUpContext("sequence1", messageNumbers[0]);
-        sequenceList.add(queue.createResendCandidate(context1));
-        ObjectMessageContext context2 =
-            setUpContext("sequence1", messageNumbers[1]);
-        sequenceList.add(queue.createResendCandidate(context2));
+        Message message1 =
+            setUpMessage("sequence1", messageNumbers[0]);
+        sequenceList.add(queue.createResendCandidate(message1));
+        Message message2 =
+            setUpMessage("sequence1", messageNumbers[1]);
+        sequenceList.add(queue.createResendCandidate(message2));
         ready();
 
         queue.purgeAcknowledged(sequence);
@@ -182,12 +175,12 @@
         List<RetransmissionQueueImpl.ResendCandidate> sequenceList =
             new ArrayList<RetransmissionQueueImpl.ResendCandidate>();
         queue.getUnacknowledged().put("sequence1", sequenceList);
-        ObjectMessageContext context1 =
-            setUpContext("sequence1", messageNumbers[0]);
-        sequenceList.add(queue.createResendCandidate(context1));
-        ObjectMessageContext context2 =
-            setUpContext("sequence1", messageNumbers[1]);
-        sequenceList.add(queue.createResendCandidate(context2));
+        Message message1 =
+            setUpMessage("sequence1", messageNumbers[0]);
+        sequenceList.add(queue.createResendCandidate(message1));
+        Message message2 =
+            setUpMessage("sequence1", messageNumbers[1]);
+        sequenceList.add(queue.createResendCandidate(message2));
         ready();
 
         queue.purgeAcknowledged(sequence);
@@ -213,12 +206,12 @@
             new ArrayList<RetransmissionQueueImpl.ResendCandidate>();
         
         queue.getUnacknowledged().put("sequence1", sequenceList);
-        ObjectMessageContext context1 =
-            setUpContext("sequence1", messageNumbers[0], false);
-        sequenceList.add(queue.createResendCandidate(context1));
-        ObjectMessageContext context2 =
-            setUpContext("sequence1", messageNumbers[1], false);
-        sequenceList.add(queue.createResendCandidate(context2));
+        Message message1 =
+            setUpMessage("sequence1", messageNumbers[0], false);
+        sequenceList.add(queue.createResendCandidate(message1));
+        Message message2 =
+            setUpMessage("sequence1", messageNumbers[1], false);
+        sequenceList.add(queue.createResendCandidate(message2));
         ready();
 
         assertEquals("unexpected unacked count", 
@@ -239,8 +232,9 @@
                      queue.countUnacknowledged(sequence));
     }
     
-    public void testPopulate() {
+    public void xtestPopulate() {
   
+        /*
         Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
         Collection<RMMessage> msgs = new ArrayList<RMMessage>();
         // List<Handler> handlerChain = new ArrayList<Handler>();
@@ -260,22 +254,10 @@
         MessageContext context = control.createMock(MessageContext.class);
         msg.getContext();
         EasyMock.expectLastCall().andReturn(context);
-        /*
-        AbstractBindingBase binding = control.createMock(AbstractBindingBase.class);
-        handler.getBinding();
-        EasyMock.expectLastCall().andReturn(binding).times(2);
-        AbstractBindingImpl abi = control.createMock(AbstractBindingImpl.class);
-        binding.getBindingImpl();
-        EasyMock.expectLastCall().andReturn(abi).times(2);
-        */
+        
         RMSoapHandler rmh = control.createMock(RMSoapHandler.class);
         MAPCodec wsah = control.createMock(MAPCodec.class);
-        /*
-        handlerChain.add(rmh);
-        handlerChain.add(wsah);
-        abi.getPostProtocolSystemHandlers();
-        EasyMock.expectLastCall().andReturn(handlerChain).times(2);
-        */
+
         handler.getWsaSOAPHandler();
         EasyMock.expectLastCall().andReturn(wsah);
         handler.getRMSoapHandler();
@@ -297,23 +279,22 @@
         
         queue.populate(sss);
         
-        assertTrue("queue is empty", !queue.isEmpty());    
+        assertTrue("queue is empty", !queue.isEmpty()); 
+        */
     }
     
     public void testResendInitiatorBackoffLogic() {
-        ObjectMessageContext context1 = setUpContext("sequence1");
-        ObjectMessageContext context2 = setUpContext("sequence2");
-        ObjectMessageContext context3 = setUpContext("sequence1");
-        setupContextMAPs(context1);
-        setupContextMAPs(context2);
-        setupContextMAPs(context3);
+        Message message1 = setUpMessage("sequence1");
+        Message message2 = setUpMessage("sequence2");
+        Message message3 = setUpMessage("sequence1");
+        
         ready();
         RetransmissionQueueImpl.ResendCandidate candidate1 =
-            queue.cacheUnacknowledged(context1);
+            queue.cacheUnacknowledged(message1);
         RetransmissionQueueImpl.ResendCandidate candidate2 =
-            queue.cacheUnacknowledged(context2);
+            queue.cacheUnacknowledged(message2);
         RetransmissionQueueImpl.ResendCandidate candidate3 =
-            queue.cacheUnacknowledged(context3);
+            queue.cacheUnacknowledged(message3);
         RetransmissionQueueImpl.ResendCandidate[] allCandidates = 
         {candidate1, candidate2, candidate3};
         boolean [] expectAckRequested = {true, true, false};
@@ -323,7 +304,7 @@
 
         // all 3 candidates due
         runInitiator(allCandidates);
-        runCandidates(allCandidates, expectAckRequested);        
+        runCandidates(allCandidates, expectAckRequested);  
                         
         // exponential backoff => none due
         runInitiator();
@@ -353,19 +334,16 @@
 
 
     public void testResendInitiatorDueLogic() {
-        ObjectMessageContext context1 = setUpContext("sequence1");
-        ObjectMessageContext context2 = setUpContext("sequence2");
-        ObjectMessageContext context3 = setUpContext("sequence1");
-        setupContextMAPs(context1);
-        setupContextMAPs(context2);
-        setupContextMAPs(context3);
+        Message message1 = setUpMessage("sequence1");
+        Message message2 = setUpMessage("sequence2");
+        Message message3 = setUpMessage("sequence1");
         ready();
         RetransmissionQueueImpl.ResendCandidate candidate1 =
-            queue.cacheUnacknowledged(context1);
+            queue.cacheUnacknowledged(message1);
         RetransmissionQueueImpl.ResendCandidate candidate2 =
-            queue.cacheUnacknowledged(context2);
+            queue.cacheUnacknowledged(message2);
         RetransmissionQueueImpl.ResendCandidate candidate3 =
-            queue.cacheUnacknowledged(context3);
+            queue.cacheUnacknowledged(message3);
         RetransmissionQueueImpl.ResendCandidate[] allCandidates = 
         {candidate1, candidate2, candidate3};
         boolean [] expectAckRequested = {true, true, false};
@@ -404,19 +382,16 @@
     }
     
     public void testResendInitiatorResolvedLogic() {
-        ObjectMessageContext context1 = setUpContext("sequence1");
-        ObjectMessageContext context2 = setUpContext("sequence2");
-        ObjectMessageContext context3 = setUpContext("sequence1");
-        setupContextMAPs(context1);
-        setupContextMAPs(context2);
-        setupContextMAPs(context3);
+        Message message1 = setUpMessage("sequence1");
+        Message message2 = setUpMessage("sequence2");
+        Message message3 = setUpMessage("sequence1");
         ready();
         RetransmissionQueueImpl.ResendCandidate candidate1 =
-            queue.cacheUnacknowledged(context1);
+            queue.cacheUnacknowledged(message1);
         RetransmissionQueueImpl.ResendCandidate candidate2 =
-            queue.cacheUnacknowledged(context2);
+            queue.cacheUnacknowledged(message2);
         RetransmissionQueueImpl.ResendCandidate candidate3 =
-            queue.cacheUnacknowledged(context3);
+            queue.cacheUnacknowledged(message3);
         RetransmissionQueueImpl.ResendCandidate[] allCandidates = 
         {candidate1, candidate2, candidate3};
         boolean [] expectAckRequested = {true, true, false};
@@ -444,28 +419,29 @@
         runInitiator();
     }
 
-    public void testResenderInitiatorNoRescheduleOnShutdown() {
+    public void xtestResenderInitiatorNoRescheduleOnShutdown() {
+        /*
         ready();
         
         queue.shutdown();
         queue.getResendInitiator().run();
+        */
     }
     
     public void testDefaultResenderClient() throws Exception {
         doTestDefaultResender(true);
     }
     
-    public void testDefaultResenderServer() throws Exception {
+    public void xtestDefaultResenderServer() throws Exception {
         doTestDefaultResender(false);
     }
 
     private void doTestDefaultResender(boolean isRequestor) throws Exception {
-        ObjectMessageContext context1 = setUpContext("sequence1");
-        setupContextMAPs(context1);
+        Message message1 = setUpMessage("sequence1");
         queue.replaceResender(queue.getDefaultResender());
         ready();
         RetransmissionQueueImpl.ResendCandidate candidate1 =
-            queue.cacheUnacknowledged(context1);
+            queue.cacheUnacknowledged(message1);
         RetransmissionQueueImpl.ResendCandidate[] allCandidates = {candidate1};
     
         // initial run => none due
@@ -473,41 +449,36 @@
     
         // single candidate due
         runInitiator(allCandidates);
-        setUpDefaultResender(0, isRequestor, context1);
+        setUpDefaultResender(0, isRequestor, message1);
         allCandidates[0].run();
     }
 
-    private ObjectMessageContext setUpContext(String sid) {
-        return setUpContext(sid, null);
+    private Message setUpMessage(String sid) {
+        return setUpMessage(sid, null);
     }
 
-    private ObjectMessageContext setUpContext(String sid,
+    private Message setUpMessage(String sid,
                                         BigInteger messageNumber) {
-        return setUpContext(sid, messageNumber, true);
+        return setUpMessage(sid, messageNumber, true);
     }
 
-    private ObjectMessageContext setUpContext(String sid,
+    private Message setUpMessage(String sid,
                                         BigInteger messageNumber,
                                         boolean storeSequence) {
-        ObjectMessageContext context =
-            createMock(ObjectMessageContext.class);
+        Message message =
+            createMock(Message.class);
         if (storeSequence) {
-            setUpSequenceType(context, sid, messageNumber);
+            setUpSequenceType(message, sid, messageNumber);
         }
-        contexts.add(context);
+        messages.add(message);
         
-        return context;
-    }
-    
-    private void setupContextMAPs(ObjectMessageContext context) {
-        AddressingPropertiesImpl maps = createMock(AddressingPropertiesImpl.class);
-        context.get(CLIENT_ADDRESSING_PROPERTIES_OUTBOUND);
-        EasyMock.expectLastCall().andReturn(maps);
+        return message;
     }
-    
+   
+    /*
     private void setupContextMessage(ObjectMessageContext context) throws Exception {
         SOAPMessage message = createMock(SOAPMessage.class);
-        context.get("org.objectweb.celtix.bindings.soap.message");
+        context.get("org.apache.cxf.bindings.soap.message");
         EasyMock.expectLastCall().andReturn(message);
         SOAPPart part = createMock(SOAPPart.class);
         message.getSOAPPart();
@@ -549,81 +520,60 @@
         headerElements.hasNext();
         EasyMock.expectLastCall().andReturn(false);
     }
+    */
 
     private void ready() {
         control.replay();
-        queue.start(workQueue);
+        queue.start();
     }
     
     private void setUpDefaultResender(int i,
                                       boolean isRequestor,
-                                      ObjectMessageContext context) 
+                                      Message context) 
         throws Exception {
-        assertTrue("too few contexts", i < contexts.size());
+        assertTrue("too few contexts", i < messages.size());
         assertTrue("too few properties", i < properties.size());
         assertTrue("too few sequences", i < sequences.size());
         control.verify();
-        control.reset();
+        control.reset();  
         
-        contexts.get(i).get(RM_PROPERTIES_OUTBOUND);
-        EasyMock.expectLastCall().andReturn(properties.get(i)).times(2);
+        messages.get(i).get(RMMessageConstants.RM_PROPERTIES_OUTBOUND);
+        EasyMock.expectLastCall().andReturn(properties.get(i)).times(1);
         properties.get(i).getSequence();
-        EasyMock.expectLastCall().andReturn(sequences.get(i)).times(2);
-        
-        setupContextMessage(context);
+        EasyMock.expectLastCall().andReturn(sequences.get(i)).times(1);
         
-        contexts.get(i).get(REQUESTOR_ROLE_PROPERTY);
+        messages.get(i).get(Message.REQUESTOR_ROLE);
         EasyMock.expectLastCall().andReturn(Boolean.valueOf(isRequestor));
-        sequences.get(i).getIdentifier();
-        EasyMock.expectLastCall().andReturn(identifiers.get(i));
-        Transport transport = isRequestor
-                              ? createMock(ClientTransport.class)
-                              : createMock(ServerTransport.class);
-        if (isRequestor) {
-            handler.getClientTransport();
-            EasyMock.expectLastCall().andReturn(transport).times(2);
-        } else {
-            handler.getServerTransport(); 
-            EasyMock.expectLastCall().andReturn(transport).times(1);
-        }
-        AbstractBindingBase binding = 
-            createMock(AbstractBindingBase.class);
-        handler.getBinding();
-        EasyMock.expectLastCall().andReturn(binding);
-        HandlerInvoker handlerInvoker =
-            createMock(HandlerInvoker.class);
-        binding.createHandlerInvoker();
-        EasyMock.expectLastCall().andReturn(handlerInvoker);
-        AbstractBindingImpl bindingImpl = 
-            createMock(AbstractBindingImpl.class);
-        binding.getBindingImpl();
-        EasyMock.expectLastCall().andReturn(bindingImpl).times(isRequestor
-                                                               ? 6
-                                                               : 5);
-        bindingImpl.createBindingMessageContext(contexts.get(i));
-        MessageContext bindingContext = 
-            createMock(MessageContext.class);
-        EasyMock.expectLastCall().andReturn(bindingContext);
-        
-        OutputStreamMessageContext outputStreamContext =
-            createMock(OutputStreamMessageContext.class);
-        transport.createOutputStreamContext(bindingContext);
-        EasyMock.expectLastCall().andReturn(outputStreamContext);
         
         if (isRequestor) {
-            
-            setUpClientDispatch(handlerInvoker,
-                                binding,
-                                outputStreamContext,
-                                bindingImpl,
-                                transport);
-        } else {
-            setUpServerDispatch(bindingContext, outputStreamContext);
+            Exchange ex = createMock(Exchange.class);
+            messages.get(i).getExchange();
+            EasyMock.expectLastCall().andReturn(ex);
+            Conduit conduit = createMock(Conduit.class);
+            ex.getConduit();
+            EasyMock.expectLastCall().andReturn(conduit);
+            conduit.send(messages.get(i));
+            EasyMock.expectLastCall();
+            OutputStream os = createMock(OutputStream.class);
+            messages.get(i).getContent(OutputStream.class);
+            EasyMock.expectLastCall().andReturn(os).times(2);
+            ByteArrayOutputStream saved = createMock(ByteArrayOutputStream.class);
+            messages.get(i).get(RMMessageConstants.SAVED_OUTPUT_STREAM);
+            EasyMock.expectLastCall().andReturn(saved);
+            byte[] content = "the saved message".getBytes();
+            saved.toByteArray();
+            EasyMock.expectLastCall().andReturn(content);
+            os.write(EasyMock.isA(byte[].class), EasyMock.eq(0), EasyMock.eq(content.length));
+            EasyMock.expectLastCall();
+            os.flush();
+            EasyMock.expectLastCall();
+            os.close();
+            EasyMock.expectLastCall(); 
         }
-        
         control.replay();
     }
 
+    /*
     private void setUpClientDispatch(
                               HandlerInvoker handlerInvoker,
                               AbstractBindingBase binding,
@@ -653,7 +603,9 @@
         bindingImpl.unmarshal(bindingContext, objectContext, null);
         EasyMock.expectLastCall();
     }
+    */
 
+    /*
     private void setUpServerDispatch(
                             MessageContext bindingContext,
                             OutputStreamMessageContext outputStreamContext) {
@@ -665,6 +617,7 @@
         outputStreamContext.getOutputStream();
         EasyMock.expectLastCall().andReturn(outputStream);
     }
+    */
 
     private void runInitiator() {
         runInitiator(null);
@@ -674,18 +627,22 @@
                        RetransmissionQueueImpl.ResendCandidate[] dueCandidates) {
         control.verify();
         control.reset();
-        
+
         for (int i = 0; 
              dueCandidates != null && i < dueCandidates.length;
              i++) {
-            workQueue.execute(dueCandidates[i]);
+            Exchange ex = createMock(Exchange.class);
+            dueCandidates[i].getMessage().getExchange();
+            EasyMock.expectLastCall().andReturn(ex);
+            Endpoint ep = createMock(Endpoint.class);
+            ex.get(Endpoint.class);
+            EasyMock.expectLastCall().andReturn(ep);
+            ep.getExecutor();
+            EasyMock.expectLastCall().andReturn(executor);
+            executor.execute(dueCandidates[i]);
             EasyMock.expectLastCall();
         }
-        /*
-        workQueue.schedule(queue.getResendInitiator(), 
-                           queue.getBaseRetransmissionInterval());         
-        EasyMock.expectLastCall();
-        */
+        
         control.replay();
         queue.getResendInitiator().run();
     }
@@ -699,23 +656,23 @@
                          expectAckRequested[i],
                          resender.includeAckRequested);
             assertSame("unexpected context",
-                       candidates[i].getContext(),
-                       resender.context);
+                       candidates[i].getMessage(),
+                       resender.message);
             resender.clear();
         }
     }
     
-    private SequenceType setUpSequenceType(ObjectMessageContext context,
+    private SequenceType setUpSequenceType(Message message,
                                            String sid,
                                            BigInteger messageNumber) {
         RMProperties rmps = createMock(RMProperties.class);
-        if (context != null) {
-            context.get(RM_PROPERTIES_OUTBOUND);
+        if (message != null) {
+            message.get(RMMessageConstants.RM_PROPERTIES_OUTBOUND);
             EasyMock.expectLastCall().andReturn(rmps);
         } 
         properties.add(rmps);
         SequenceType sequence = createMock(SequenceType.class);
-        if (context != null) {
+        if (message != null) {
             rmps.getSequence();
             EasyMock.expectLastCall().andReturn(sequence);
         }
@@ -757,7 +714,7 @@
             sequence.getIdentifier();
             EasyMock.expectLastCall().andReturn(id);
             RMStore store = createMock(RMStore.class);
-            handler.getStore();
+            manager.getStore();
             EasyMock.expectLastCall().andReturn(store);
         }
         return sequence;
@@ -778,16 +735,16 @@
     }
     
     private static class TestResender implements RetransmissionQueueImpl.Resender {
-        ObjectMessageContext context;
+        Message message;
         boolean includeAckRequested;
         
-        public void resend(ObjectMessageContext ctx, boolean requestAcknowledge) {
-            context = ctx;
+        public void resend(Message ctx, boolean requestAcknowledge) {
+            message = ctx;
             includeAckRequested = requestAcknowledge;
         }
         
         void clear() {
-            context = null;
+            message = null;
             includeAckRequested = false;            
         }
     };

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ControlImpl.java Tue Nov 21 06:50:53 2006
@@ -33,6 +33,8 @@
 import org.apache.cxf.greeter_control.Control;
 import org.apache.cxf.greeter_control.types.StartGreeterResponse;
 import org.apache.cxf.greeter_control.types.StopGreeterResponse;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RetransmissionQueue;
 
 
 @WebService(serviceName = "ControlService", 
@@ -73,6 +75,11 @@
         }
         endpoint = null;
         if (null != greeterBus) {
+            RMManager manager = greeterBus.getExtension(RMManager.class);
+            RetransmissionQueue queue = manager.getRetransmissionQueue();
+            if (null != queue) {
+                queue.stop();
+            }
             greeterBus.shutdown(true);
         }
         return true;

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/MessageFlow.java Tue Nov 21 06:50:53 2006
@@ -410,6 +410,13 @@
         }
     }
     
+    public void purge() {
+        inboundMessages.clear();
+        outboundMessages.clear();
+        inStreams.clear();
+        outStreams.clear();
+    }
+    
     public void verifyPartialResponses(int nExpected) throws Exception {
         verifyPartialResponses(nExpected, null);
     }

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/OutMessageRecorder.java Tue Nov 21 06:50:53 2006
@@ -50,7 +50,6 @@
     public OutMessageRecorder() {
         outbound = new ArrayList<byte[]>();
         setPhase(Phase.PRE_PROTOCOL);
-        // setPhase(Phase.POST_STREAM);
     }
     
     public void handleMessage(Message message) throws Fault {

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=477690&r1=477689&r2=477690
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Tue Nov 21 06:50:53 2006
@@ -35,6 +35,7 @@
 import org.apache.cxf.systest.common.ClientServerTestBase;
 import org.apache.cxf.ws.rm.RMConstants;
 import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RetransmissionQueue;
 
 
 /**
@@ -67,6 +68,7 @@
     private boolean doTestOnewayDeferredAnonymousAcks = testAll;
     private boolean doTestOnewayDeferredNonAnonymousAcks = testAll;
     private boolean doTestOnewayAnonymousAcksSequenceLength1 = testAll;
+    private boolean doTestOnewayAnonymousAcksSupressed = testAll;
     private boolean doTestTwowayNonAnonymous = testAll;
     private boolean doTestTwowayNonAnonymousDeferred = testAll;
     private boolean doTestTwowayNonAnonymousMaximumSequenceLength2 = testAll;
@@ -101,11 +103,17 @@
     
     public void tearDown() {
         if (null != greeter) {
-            assertTrue("Failed to stop greeter.", control.stopGreeter());            
+            assertTrue("Failed to stop greeter.", control.stopGreeter());                        
+            RMManager manager = greeterBus.getExtension(RMManager.class);
+            RetransmissionQueue queue = manager.getRetransmissionQueue();
+            if (null != queue) {
+                queue.stop();
+            }
             greeterBus.shutdown(true);
             greeterBus = null;
         }
-        if (null != control) {               
+        if (null != control) {  
+            assertTrue("Failed to stop greeter", control.stopGreeter());
             controlBus.shutdown(true);
         }
     }
@@ -271,6 +279,53 @@
         mf.verifyMessageNumbers(new String[] {null, null, null, null, null, null}, false);
         mf.verifyLastMessage(new boolean[] {false, false, false, false, false, false}, false);
         mf.verifyAcknowledgements(new boolean[] {false, true, false, false, true, false}, false);
+    }
+    
+    public void testOnewayAnonymousAcksSupressed() throws Exception {
+
+        if (!doTestOnewayAnonymousAcksSupressed) {
+            return;
+        }
+        setupGreeter("org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml");
+
+        greeter.greetMeOneWay("once");
+        greeter.greetMeOneWay("twice");
+        greeter.greetMeOneWay("thrice");
+
+        // three application messages plus createSequence
+        
+        awaitMessages(4, 4, 2000);
+        
+        MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+        
+        mf.verifyMessages(4, true);
+        String[] expectedActions = new String[] {RMConstants.getCreateSequenceAction(), 
+                                                 GREETMEONEWAY_ACTION,
+                                                 GREETMEONEWAY_ACTION, 
+                                                 GREETMEONEWAY_ACTION};
+        mf.verifyActions(expectedActions, true);
+        mf.verifyMessageNumbers(new String[] {null, "1", "2", "3"}, true);
+
+        // createSequenceResponse plus 3 partial responses, none of which
+        // contain an acknowledgment
+
+        mf.verifyMessages(4, false);
+        mf.verifyPartialResponses(3, new boolean[3]);
+        mf.purgePartialResponses();
+        
+        expectedActions = new String[] {RMConstants.getCreateSequenceResponseAction()};
+        mf.verifyActions(expectedActions, false);
+        
+        mf.purge();
+        assertEquals(0, outRecorder.getOutboundMessages().size());
+        assertEquals(0, inRecorder.getInboundMessages().size());
+
+        // allow resends to kick in
+        // await multiple of 3 resends to avoid shutting down server
+        // in the course of retransmission - this is harmless but pollutes test output
+        
+        awaitMessages(3, 0, 5000);
+        
     }
     
     public void testTwowayNonAnonymous() throws Exception {

Added: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml?view=auto&rev=477690
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml (added)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml Tue Nov 21 06:50:53 2006
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:wsrm-mgmt="http://cxf.apache.org/ws/rm/manager"
+       xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+       xsi:schemaLocation="
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+    
+    <import resource="rminterceptors.xml"/>
+    
+    <bean id="org.apache.cxf.ws.rm.RMManager" class="org.apache.cxf.ws.rm.RMManager">
+        <property name="bus" ref="cxf"/>  
+        <property name="destinationPolicy">
+            <value>
+                <wsrm-mgmt:destinationPolicy>
+                    <wsrm-mgmt:acksPolicy intraMessageThreshold="0"/>                    
+                </wsrm-mgmt:destinationPolicy>
+            </value>
+        </property>
+        
+        <property name="RMAssertion">
+            <value>
+                <wsrm-policy:RMAssertion>         
+                    <wsrm-policy:BaseRetransmissionInterval Milliseconds="3000"/>           
+                    <wsrm-policy:AcknowledgementInterval Milliseconds="99999999"/>                                                        
+                </wsrm-policy:RMAssertion>
+            </value>
+        </property> 
+    </bean>   
+    
+</beans>
\ No newline at end of file

Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/anonymous-suppressed.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml