You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2007/05/24 19:33:13 UTC

svn commit: r541364 [1/2] - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/endpoint/ rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/core/src/main/resources/META-INF/c...

Author: andreasmyth
Date: Thu May 24 10:33:11 2007
New Revision: 541364

URL: http://svn.apache.org/viewvc?view=rev&rev=541364
Log:
[JIRA CXF-139] Client-side recovery. 
Added client lifecycle management interface to be notified of client creation so that recovery can take place before client sends any requests.
Fixed minor bug in handling TerminateSequence invocation.

Added:
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java   (with props)
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java   (with props)
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMUtilsTest.java   (with props)
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java   (with props)
Removed:
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/PersistenceTest.java
Modified:
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java
    incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/SoapHeaderInterceptor.java
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ServerLifeCycleManagerImpl.java
    incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/oneway-client-crash.xml

Modified: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java (original)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java Thu May 24 10:33:11 2007
@@ -92,4 +92,11 @@
      * @param selector the ConduitSelector to use
      */
     void setConduitSelector(ConduitSelector selector);
+    
+    /**
+     * Indicates that the client is no longer needed and that any resources it holds
+     * can now be freed.
+     *
+     */
+    void destroy();
 }

Added: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java?view=auto&rev=541364
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java (added)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java Thu May 24 10:33:11 2007
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.endpoint;
+
+public interface ClientLifeCycleListener {
+    void clientCreated(Client client);
+    void clientDestroyed(Client client); 
+}

Propchange: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java?view=auto&rev=541364
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java (added)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java Thu May 24 10:33:11 2007
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.endpoint;
+
+
+public interface ClientLifeCycleManager { 
+    void clientCreated(Client client);
+    void clientDestroyed(Client client);
+    void registerListener(ClientLifeCycleListener listener);
+    void unRegisterListener(ClientLifeCycleListener listener);
+}

Propchange: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/SoapHeaderInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/SoapHeaderInterceptor.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/SoapHeaderInterceptor.java (original)
+++ incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/SoapHeaderInterceptor.java Thu May 24 10:33:11 2007
@@ -63,6 +63,10 @@
         }
 
         BindingOperationInfo bop = exchange.get(BindingOperationInfo.class);
+        if (null == bop) {
+            return;
+        }
+
         if (bop.isUnwrapped()) {
             bop = bop.getWrappedOperation();
         }

Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java Thu May 24 10:33:11 2007
@@ -85,6 +85,7 @@
         bus = b;
         outFaultObserver = new ClientOutFaultObserver(bus);
         getConduitSelector(sc).setEndpoint(e);
+        notifyLifecycleManager();
     }
 
     public ClientImpl(URL wsdlUrl) {
@@ -109,6 +110,24 @@
         } catch (EndpointException epex) {
             throw new IllegalStateException("Unable to create endpoint: " + epex.getMessage(), epex);
         }
+        notifyLifecycleManager();
+    }
+    
+    public void destroy() {
+        
+        // TODO: also inform the conduit so it can shutdown any response listeners
+        
+        ClientLifeCycleManager mgr = bus.getExtension(ClientLifeCycleManager.class);
+        if (null != mgr) {
+            mgr.clientDestroyed(this);
+        }
+    }
+    
+    private void notifyLifecycleManager() {
+        ClientLifeCycleManager mgr = bus.getExtension(ClientLifeCycleManager.class);
+        if (null != mgr) {
+            mgr.clientCreated(this);
+        }
     }
 
     private EndpointInfo findEndpoint(Service svc, QName port) {
@@ -226,7 +245,7 @@
         // setup conduit selector
         prepareConduitSelector(message);
         
-        // execute chain
+        // execute chain        
         chain.doIntercept(message);
 
         getConduitSelector().complete(exchange);

Added: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java?view=auto&rev=541364
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java (added)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java Thu May 24 10:33:11 2007
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.endpoint;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cxf.extension.BusExtension;
+
+public class ClientLifeCycleManagerImpl implements ClientLifeCycleManager, BusExtension {
+    
+    private List<ClientLifeCycleListener> listeners = new ArrayList<ClientLifeCycleListener>(); 
+
+    public Class<?> getRegistrationType() {
+        return ClientLifeCycleManager.class;
+    }
+
+    public void registerListener(ClientLifeCycleListener listener) {
+        listeners.add(listener);
+    }
+
+    public void clientCreated(Client client) {
+        if (null != listeners) {
+            for (ClientLifeCycleListener listener : listeners) {
+                listener.clientCreated(client);
+            }
+        }
+    }
+
+    public void clientDestroyed(Client client) {
+        if (null != listeners) {
+            for (ClientLifeCycleListener listener : listeners) {
+                listener.clientDestroyed(client);
+            }
+        } 
+    }
+
+    public void unRegisterListener(ClientLifeCycleListener listener) {
+        listeners.remove(listener);
+    }
+
+}

Propchange: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ServerLifeCycleManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ServerLifeCycleManagerImpl.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ServerLifeCycleManagerImpl.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ServerLifeCycleManagerImpl.java Thu May 24 10:33:11 2007
@@ -22,15 +22,15 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
+import org.apache.cxf.extension.BusExtension;
 
-import org.apache.cxf.Bus;
-
-public class ServerLifeCycleManagerImpl implements ServerLifeCycleManager {
+public class ServerLifeCycleManagerImpl implements ServerLifeCycleManager, BusExtension {
     
     private List<ServerLifeCycleListener> listeners = new ArrayList<ServerLifeCycleListener>();
-    private Bus bus;
+
+    public Class<?> getRegistrationType() {
+        return ServerLifeCycleManager.class;
+    }
 
     public synchronized void registerListener(ServerLifeCycleListener listener) {
         listeners.add(listener);
@@ -62,21 +62,5 @@
 
     public synchronized void unRegisterListener(ServerLifeCycleListener listener) {
         listeners.remove(listener);
-    }
-    
-    public Bus getBus() {
-        return bus;
-    }
-    
-    @Resource
-    public void setBus(Bus bus) {        
-        this.bus = bus;        
-    }
-    
-    @PostConstruct
-    public void register() {
-        if (null != bus) {
-            bus.setExtension(this, ServerLifeCycleManager.class);
-        }
     }
 }

Modified: incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml (original)
+++ incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml Thu May 24 10:33:11 2007
@@ -88,9 +88,9 @@
         <property name="bus" ref="cxf"/>
     </bean>
 
-    <bean id="org.apache.cxf.endpoint.ServerLifeCycleManager" class="org.apache.cxf.endpoint.ServerLifeCycleManagerImpl">
-        <property name="bus" ref="cxf"/>
-    </bean>
+    <bean id="org.apache.cxf.endpoint.ServerLifeCycleManager" class="org.apache.cxf.endpoint.ServerLifeCycleManagerImpl"/>
+    <bean id="org.apache.cxf.endpoint.ClientLifeCycleManager" class="org.apache.cxf.endpoint.ClientLifeCycleManagerImpl"/>
+        
 
     <bean id="org.apache.cxf.transports.http.QueryHandlerRegistry" class="org.apache.cxf.transport.http.QueryHandlerRegistryImpl">
         <property name="bus" ref="cxf"/>

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java Thu May 24 10:33:11 2007
@@ -30,7 +30,7 @@
     }
     
     public String getName() {
-        return reliableEndpoint.getName();
+        return RMUtils.getEndpointIdentifier(getEndpoint());
     }
     
     /** 

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Thu May 24 10:33:11 2007
@@ -20,6 +20,7 @@
 package org.apache.cxf.ws.rm;
 
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -184,7 +185,10 @@
         
         OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
             .getOperation(RMConstants.getLastMessageOperationName());
-        invoke(oi, new Object[] {}, null);
+        // pass reference to source sequence in invocation context
+        Map<String, Object> context = Collections.singletonMap(SourceSequence.class.getName(), (Object)s);
+
+        invoke(oi, new Object[] {}, context);
     }
     
     void ackRequested(SourceSequence s) throws RMException {

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java Thu May 24 10:33:11 2007
@@ -149,7 +149,7 @@
         ((AddressingPropertiesImpl)maps).exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
     }
 
-    private static String getRMPropertiesKey(boolean outbound) {
+    public static String getRMPropertiesKey(boolean outbound) {
         return outbound
             ? RMMessageConstants.RM_PROPERTIES_OUTBOUND : RMMessageConstants.RM_PROPERTIES_INBOUND;
     }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Thu May 24 10:33:11 2007
@@ -19,7 +19,6 @@
 
 package org.apache.cxf.ws.rm;
 
-import java.text.MessageFormat;
 import java.util.Collection;
 import java.util.List;
 import java.util.logging.Level;
@@ -58,27 +57,25 @@
 import org.apache.neethi.Policy;
 
 public class RMEndpoint {
-    
+
     private static final Logger LOG = LogUtils.getL7dLogger(RMEndpoint.class);
-    
-    private static final QName SERVICE_NAME = 
-        new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractService");
-    private static final QName INTERFACE_NAME = 
-         new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractPortType");
-    private static final QName BINDING_NAME = 
-        new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractSoapBinding");
-
-    private static final QName CREATE_PART_NAME =
-        new QName(RMConstants.getWsdlNamespace(), "create");
-    private static final QName CREATE_RESPONSE_PART_NAME =
-        new QName(RMConstants.getWsdlNamespace(), "createResponse");
-    private static final QName TERMINATE_PART_NAME =
-        new QName(RMConstants.getWsdlNamespace(), "terminate");
-        
+
+    private static final QName SERVICE_NAME = new QName(RMConstants.getWsdlNamespace(),
+                                                        "SequenceAbstractService");
+    private static final QName INTERFACE_NAME = new QName(RMConstants.getWsdlNamespace(),
+                                                          "SequenceAbstractPortType");
+    private static final QName BINDING_NAME = new QName(RMConstants.getWsdlNamespace(),
+                                                        "SequenceAbstractSoapBinding");
+
+    private static final QName CREATE_PART_NAME = new QName(RMConstants.getWsdlNamespace(), "create");
+    private static final QName CREATE_RESPONSE_PART_NAME = new QName(RMConstants.getWsdlNamespace(),
+                                                                     "createResponse");
+    private static final QName TERMINATE_PART_NAME = new QName(RMConstants.getWsdlNamespace(), "terminate");
+
     private RMManager manager;
     private Endpoint applicationEndpoint;
     private Conduit conduit;
-    private org.apache.cxf.ws.addressing.EndpointReferenceType replyTo; 
+    private org.apache.cxf.ws.addressing.EndpointReferenceType replyTo;
     private Source source;
     private Destination destination;
     private WrappedService service;
@@ -87,7 +84,7 @@
     private Servant servant;
     private long lastApplicationMessage;
     private long lastControlMessage;
-     
+
     public RMEndpoint(RMManager m, Endpoint ae) {
         manager = m;
         applicationEndpoint = ae;
@@ -96,56 +93,49 @@
         proxy = new Proxy(this);
         servant = new Servant(this);
     }
-    
-    public String getName() {
-        return MessageFormat.format("{0}.{1}", new Object[] {
-            applicationEndpoint.getEndpointInfo().getService().getName(),
-            applicationEndpoint.getEndpointInfo().getName()
-        });
-    }
-    
+
     /**
      * @return Returns the bus.
      */
     public RMManager getManager() {
         return manager;
     }
-      
+
     /**
      * @return Returns the application endpoint.
      */
     public Endpoint getApplicationEndpoint() {
         return applicationEndpoint;
     }
-    
+
     /**
      * @return Returns the RM protocol endpoint.
      */
     public Endpoint getEndpoint() {
         return endpoint;
     }
-    
+
     /**
      * @return Returns the RM protocol service.
      */
     public Service getService() {
         return service;
     }
-    
+
     /**
      * @return Returns the RM protocol binding info.
      */
     public BindingInfo getBindingInfo() {
         return service.getServiceInfo().getBinding(BINDING_NAME);
     }
-    
+
     /**
      * @return Returns the proxy.
      */
     public Proxy getProxy() {
         return proxy;
     }
-    
+
     /**
      * @return Returns the servant.
      */
@@ -153,35 +143,35 @@
         return servant;
     }
 
-    /** 
+    /**
      * @return Returns the destination.
      */
     public Destination getDestination() {
         return destination;
     }
-    
+
     /**
      * @param destination The destination to set.
      */
     public void setDestination(Destination destination) {
         this.destination = destination;
     }
-    
-    /** 
+
+    /**
      * @return Returns the source.
      */
     public Source getSource() {
         return source;
     }
-    
+
     /**
      * @param source The source to set.
      */
     public void setSource(Source source) {
         this.source = source;
-    } 
-    
-    /** 
+    }
+
+    /**
      * @return The time when last application message was received.
      */
     public long getLastApplicationMessage() {
@@ -195,7 +185,7 @@
         lastApplicationMessage = System.currentTimeMillis();
     }
 
-    /** 
+    /**
      * @return The time when last RM protocol message was received.
      */
     public long getLastControlMessage() {
@@ -208,46 +198,44 @@
     public void receivedControlMessage() {
         lastControlMessage = System.currentTimeMillis();
     }
-    
-    /** 
+
+    /**
      * @return Returns the conduit.
      */
     public Conduit getConduit() {
         return conduit;
     }
 
-    /** 
-     * Returns the replyTo address of the first application request, i.e. the target address to which to 
-     * send CreateSequence, CreateSequenceResponse and TerminateSequence messages originating from the
-     * from the server.
+    /**
+     * Returns the replyTo address of the first application request, i.e. the
+     * target address to which to send CreateSequence, CreateSequenceResponse
+     * and TerminateSequence messages originating from the from the server.
+     * 
      * @return the replyTo address
      */
     org.apache.cxf.ws.addressing.EndpointReferenceType getReplyTo() {
         return replyTo;
     }
-    
-    
-    void initialise(Conduit c, org.apache.cxf.ws.addressing.EndpointReferenceType r) {  
+
+    void initialise(Conduit c, org.apache.cxf.ws.addressing.EndpointReferenceType r) {
         conduit = c;
         replyTo = r;
         createService();
         createEndpoint();
         setPolicies();
     }
-    
+
     void createService() {
         ServiceInfo si = new ServiceInfo();
         si.setName(SERVICE_NAME);
         buildInterfaceInfo(si);
-        
+
         service = new WrappedService(applicationEndpoint.getService(), SERVICE_NAME, si);
-        
+
         DataBinding dataBinding = null;
         try {
-            dataBinding = new JAXBDataBinding(CreateSequenceType.class,
-                                              CreateSequenceResponseType.class,
-                                              TerminateSequenceType.class,
-                                              SequenceFaultType.class);
+            dataBinding = new JAXBDataBinding(CreateSequenceType.class, CreateSequenceResponseType.class,
+                                              TerminateSequenceType.class, SequenceFaultType.class);
         } catch (JAXBException e) {
             throw new ServiceConstructionException(e);
         }
@@ -261,57 +249,57 @@
         EndpointInfo aei = applicationEndpoint.getEndpointInfo();
         String transportId = aei.getTransportId();
         EndpointInfo ei = new EndpointInfo(si, transportId);
-        
+
         ei.setAddress(aei.getAddress());
-        
+
         ei.setName(RMConstants.getPortName());
         ei.setBinding(si.getBinding(BINDING_NAME));
 
-        // if addressing was enabled on the application endpoint by means 
-        // of the UsingAddressing element extensor, use this for the 
+        // if addressing was enabled on the application endpoint by means
+        // of the UsingAddressing element extensor, use this for the
         // RM endpoint also
-        
+
         Object ua = getUsingAddressing(aei);
         if (null != ua) {
             ei.addExtensor(ua);
-        } 
+        }
         si.addEndpoint(ei);
-        
+
         endpoint = new WrappedEndpoint(applicationEndpoint, ei, service);
         service.setEndpoint(endpoint);
     }
-    
+
     void setPolicies() {
         // use same WS-policies as for application endpoint
-        PolicyEngine engine = manager.getBus().getExtension(PolicyEngine.class);  
+        PolicyEngine engine = manager.getBus().getExtension(PolicyEngine.class);
         if (null == engine || !engine.isEnabled()) {
             return;
         }
-        
+
         EndpointInfo ei = getEndpoint().getEndpointInfo();
-                
-        PolicyInterceptorProviderRegistry reg = 
-            manager.getBus().getExtension(PolicyInterceptorProviderRegistry.class);
-        EndpointPolicy ep = null == conduit
-            ? engine.getServerEndpointPolicy(applicationEndpoint.getEndpointInfo(), null)
-            : engine.getClientEndpointPolicy(applicationEndpoint.getEndpointInfo(), conduit);
-        
+
+        PolicyInterceptorProviderRegistry reg = manager.getBus()
+            .getExtension(PolicyInterceptorProviderRegistry.class);
+        EndpointPolicy ep = null == conduit ? engine.getServerEndpointPolicy(applicationEndpoint
+            .getEndpointInfo(), null) : engine.getClientEndpointPolicy(applicationEndpoint.getEndpointInfo(),
+                                                                       conduit);
+
         engine.setEndpointPolicy(ei, ep);
-        
+
         EffectivePolicy effectiveOutbound = new EffectivePolicyImpl(ep, reg, true, false);
         EffectivePolicy effectiveInbound = new EffectivePolicyImpl(ep, reg, false, false);
-        
+
         BindingInfo bi = ei.getBinding();
         Collection<BindingOperationInfo> bois = bi.getOperations();
-        
+
         for (BindingOperationInfo boi : bois) {
             engine.setEffectiveServerRequestPolicy(ei, boi, effectiveInbound);
             engine.setEffectiveServerResponsePolicy(ei, boi, effectiveOutbound);
 
             engine.setEffectiveClientRequestPolicy(ei, boi, effectiveOutbound);
-            engine.setEffectiveClientResponsePolicy(ei, boi, effectiveInbound);            
+            engine.setEffectiveClientResponsePolicy(ei, boi, effectiveInbound);
         }
-        
+
         // TODO: FaultPolicy (SequenceFault)
     }
 
@@ -326,12 +314,12 @@
         buildSequenceAckOperationInfo(ii);
         buildLastMessageOperationInfo(ii);
         buildAckRequestedOperationInfo(ii);
-        
+
         // TODO: FaultInfo (SequenceFault)
     }
 
     void buildCreateSequenceOperationInfo(InterfaceInfo ii) {
-        
+
         OperationInfo operationInfo = null;
         MessagePartInfo partInfo = null;
         MessageInfo messageInfo = null;
@@ -343,7 +331,7 @@
         partInfo.setElementQName(RMConstants.getCreateSequenceOperationName());
         partInfo.setElement(true);
         partInfo.setTypeClass(CreateSequenceType.class);
-        
+
         messageInfo = operationInfo.createMessage(RMConstants.getCreateSequenceResponseOperationName());
         operationInfo.setOutput(messageInfo.getName().getLocalPart(), messageInfo);
         partInfo = messageInfo.addMessagePart(CREATE_RESPONSE_PART_NAME);
@@ -351,7 +339,7 @@
         partInfo.setElement(true);
         partInfo.setTypeClass(CreateSequenceResponseType.class);
         partInfo.setIndex(-1);
-        
+
         operationInfo = ii.addOperation(RMConstants.getCreateSequenceOnewayOperationName());
         messageInfo = operationInfo.createMessage(RMConstants.getCreateSequenceOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
@@ -359,7 +347,7 @@
         partInfo.setElementQName(RMConstants.getCreateSequenceOperationName());
         partInfo.setElement(true);
         partInfo.setTypeClass(CreateSequenceType.class);
-        
+
         operationInfo = ii.addOperation(RMConstants.getCreateSequenceResponseOnewayOperationName());
         messageInfo = operationInfo.createMessage(RMConstants.getCreateSequenceResponseOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
@@ -368,13 +356,13 @@
         partInfo.setElement(true);
         partInfo.setTypeClass(CreateSequenceResponseType.class);
     }
-    
+
     void buildTerminateSequenceOperationInfo(InterfaceInfo ii) {
-        
+
         OperationInfo operationInfo = null;
         MessagePartInfo partInfo = null;
         MessageInfo messageInfo = null;
-        
+
         operationInfo = ii.addOperation(RMConstants.getTerminateSequenceOperationName());
         messageInfo = operationInfo.createMessage(RMConstants.getTerminateSequenceOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
@@ -393,7 +381,7 @@
         messageInfo = operationInfo.createMessage(RMConstants.getSequenceAckOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
     }
-    
+
     void buildLastMessageOperationInfo(InterfaceInfo ii) {
 
         OperationInfo operationInfo = null;
@@ -403,7 +391,7 @@
         messageInfo = operationInfo.createMessage(RMConstants.getLastMessageOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
     }
-    
+
     void buildAckRequestedOperationInfo(InterfaceInfo ii) {
 
         OperationInfo operationInfo = null;
@@ -423,51 +411,50 @@
             BindingOperationInfo boi = null;
             SoapOperationInfo soi = null;
 
-            boi = bi.buildOperation(RMConstants.getCreateSequenceOperationName(), 
-                RMConstants.getCreateSequenceOperationName().getLocalPart(), null);
+            boi = bi.buildOperation(RMConstants.getCreateSequenceOperationName(), RMConstants
+                .getCreateSequenceOperationName().getLocalPart(), null);
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getCreateSequenceAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
-            
-            boi = bi.buildOperation(RMConstants.getTerminateSequenceOperationName(), 
-                RMConstants.getTerminateSequenceOperationName().getLocalPart(), null);
+
+            boi = bi.buildOperation(RMConstants.getTerminateSequenceOperationName(), RMConstants
+                .getTerminateSequenceOperationName().getLocalPart(), null);
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getTerminateSequenceAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
-            
-            boi = bi.buildOperation(RMConstants.getSequenceAckOperationName(), 
-                null, null);
+
+            boi = bi.buildOperation(RMConstants.getSequenceAckOperationName(), null, null);
             assert null != boi;
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getSequenceAckAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
-            
+
             boi = bi.buildOperation(RMConstants.getLastMessageOperationName(), null, null);
             assert null != boi;
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getLastMessageAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
-            
+
             boi = bi.buildOperation(RMConstants.getAckRequestedOperationName(), null, null);
             assert null != boi;
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getAckRequestedAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
-            
-            boi = bi.buildOperation(RMConstants.getCreateSequenceOnewayOperationName(), 
-                RMConstants.getCreateSequenceOperationName().getLocalPart(), null);
+
+            boi = bi.buildOperation(RMConstants.getCreateSequenceOnewayOperationName(), RMConstants
+                .getCreateSequenceOperationName().getLocalPart(), null);
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getCreateSequenceAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
 
-            boi = bi.buildOperation(RMConstants.getCreateSequenceResponseOnewayOperationName(), 
-                RMConstants.getCreateSequenceResponseOperationName().getLocalPart(), null);
+            boi = bi.buildOperation(RMConstants.getCreateSequenceResponseOnewayOperationName(), RMConstants
+                .getCreateSequenceResponseOperationName().getLocalPart(), null);
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getCreateSequenceResponseAction());
             boi.addExtensor(soi);
@@ -475,10 +462,10 @@
 
             si.addBinding(bi);
         }
-        
+
         // TODO: BindingFaultInfo (SequenceFault)
     }
-    
+
     Object getUsingAddressing(EndpointInfo endpointInfo) {
         if (null == endpointInfo) {
             return null;
@@ -489,21 +476,21 @@
         if (null != ua) {
             return ua;
         }
-        exts = endpointInfo.getBinding() != null
-            ? endpointInfo.getBinding().getExtensors(ExtensibilityElement.class) : null;
+        exts = endpointInfo.getBinding() != null ? endpointInfo.getBinding()
+            .getExtensors(ExtensibilityElement.class) : null;
         ua = getUsingAddressing(exts);
         if (null != ua) {
             return ua;
         }
-        exts = endpointInfo.getService() != null
-            ? endpointInfo.getService().getExtensors(ExtensibilityElement.class) : null;
+        exts = endpointInfo.getService() != null ? endpointInfo.getService()
+            .getExtensors(ExtensibilityElement.class) : null;
         ua = getUsingAddressing(exts);
         if (null != ua) {
             return ua;
         }
-        return ua;        
+        return ua;
     }
-    
+
     Object getUsingAddressing(List<ExtensibilityElement> exts) {
         Object ua = null;
         if (exts != null) {
@@ -515,25 +502,25 @@
         }
         return ua;
     }
-    
+
     void setAplicationEndpoint(Endpoint ae) {
         applicationEndpoint = ae;
     }
-    
+
     void setManager(RMManager m) {
         manager = m;
     }
-    
+
     void shutdown() {
         // cancel outstanding timer tasks (deferred acknowledgements)
         // and scheduled termination for all
         // destination sequences of this endpoint
-        
+
         for (DestinationSequence ds : getDestination().getAllSequences()) {
             ds.cancelDeferredAcknowledgments();
             ds.cancelTermination();
         }
-        
+
         // try terminating sequences
         SourcePolicyType sp = manager.getSourcePolicy();
         SequenceTerminationPolicyType stp = null;
@@ -541,47 +528,45 @@
             stp = sp.getSequenceTerminationPolicy();
         }
         if (null != stp && stp.isTerminateOnShutdown()) {
-            
+
             Collection<SourceSequence> seqs = source.getAllUnacknowledgedSequences();
             LOG.log(Level.FINE, "Trying to terminate {0} sequences", seqs.size());
             for (SourceSequence seq : seqs) {
                 try {
-                    // destination MUST respond with a 
+                    // destination MUST respond with a
                     // sequence acknowledgement
                     if (seq.isLastMessage()) {
                         // REVISIT: this may be non-standard
                         // getProxy().ackRequested(seq);
                     } else {
-                        
+
                         getProxy().lastMessage(seq);
                     }
                 } catch (RMException ex) {
                     // already logged
                 }
             }
-        }     
-        
+        }
+
         // cancel outstanding resends for all source sequences
         // of this endpoint
-        
+
         for (SourceSequence ss : getSource().getAllSequences()) {
             manager.getRetransmissionQueue().stop(ss);
         }
     }
-    
-    
-    
+
     class EffectivePolicyImpl implements EffectivePolicy {
-        
+
         private EndpointPolicy endpointPolicy;
         private List<Interceptor> interceptors;
 
-        EffectivePolicyImpl(EndpointPolicy ep, PolicyInterceptorProviderRegistry reg, 
-                            boolean outbound, boolean fault) {
+        EffectivePolicyImpl(EndpointPolicy ep, PolicyInterceptorProviderRegistry reg, boolean outbound,
+                            boolean fault) {
             endpointPolicy = ep;
             interceptors = reg.getInterceptors(endpointPolicy.getChosenAlternative(), outbound, fault);
         }
-        
+
         public Collection<Assertion> getChosenAlternative() {
             return endpointPolicy.getChosenAlternative();
         }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Thu May 24 10:33:11 2007
@@ -20,6 +20,7 @@
 package org.apache.cxf.ws.rm;
 
 import java.math.BigInteger;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
@@ -34,10 +35,22 @@
 import org.apache.cxf.Bus;
 import org.apache.cxf.binding.Binding;
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.ClientLifeCycleListener;
+import org.apache.cxf.endpoint.ClientLifeCycleManager;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.endpoint.ServerLifeCycleListener;
+import org.apache.cxf.endpoint.ServerLifeCycleManager;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.Service;
+import org.apache.cxf.service.model.BindingInfo;
+import org.apache.cxf.service.model.InterfaceInfo;
+import org.apache.cxf.service.model.ServiceInfo;
+import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
 import org.apache.cxf.ws.addressing.RelatesToType;
@@ -46,6 +59,7 @@
 import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
 import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.persistence.RMMessage;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.policy.RMAssertion;
 import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
@@ -55,7 +69,7 @@
 /**
  * 
  */
-public class RMManager {
+public class RMManager implements ServerLifeCycleListener, ClientLifeCycleListener {
 
     private static final Logger LOG = LogUtils.getL7dLogger(RMManager.class);
 
@@ -65,30 +79,33 @@
     private RetransmissionQueue retransmissionQueue;
     private Map<Endpoint, RMEndpoint> reliableEndpoints = new HashMap<Endpoint, RMEndpoint>();
     private Timer timer = new Timer(true);
-    private final ServerLifeCycleListener serverLifeCycleListener;
     private RMAssertion rmAssertion;
     private DeliveryAssuranceType deliveryAssurance;
     private SourcePolicyType sourcePolicy;
     private DestinationPolicyType destinationPolicy;
     
+    // ServerLifeCycleListener
+    
+    public void startServer(Server server) {
+        recoverReliableEndpoint(server.getEndpoint(), null);
+    }
 
-    public RMManager() {
-        serverLifeCycleListener = new ServerLifeCycleListener() {
-
-            public void startServer(Server server) {
-            }
-
-            public void stopServer(Server server) {
-                RMManager.this.shutdownReliableEndpoint(server.getEndpoint());
-            }
-            
-        };        
+    public void stopServer(Server server) {
+        shutdownReliableEndpoint(server.getEndpoint());
     }
     
-    public ServerLifeCycleListener getServerLifeCycleListener() {
-        return serverLifeCycleListener;
+    // ClientLifeCycleListener
+    
+    public void clientCreated(Client client) {
+        recoverReliableEndpoint(client.getEndpoint(), client.getConduit());
+    }
+    
+    public void clientDestroyed(Client client) {
+        shutdownReliableEndpoint(client.getEndpoint());
     }
 
+    // Configuration
+    
     public Bus getBus() {
         return bus;
     }
@@ -136,6 +153,85 @@
     public BindingFaultFactory getBindingFaultFactory(Binding binding) {
         return new SoapFaultFactory(binding);
     }
+    
+    /**  
+     * @return Returns the deliveryAssurance.
+     */
+    public DeliveryAssuranceType getDeliveryAssurance() {
+        return deliveryAssurance;
+    }
+
+    /**
+     * @param deliveryAssurance The deliveryAssurance to set.
+     */
+    public void setDeliveryAssurance(DeliveryAssuranceType deliveryAssurance) {
+        this.deliveryAssurance = deliveryAssurance;
+    }
+
+    /**
+     * @return Returns the destinationPolicy.
+     */
+    public DestinationPolicyType getDestinationPolicy() {
+        return destinationPolicy;
+    }
+
+    /**
+     * @param destinationPolicy The destinationPolicy to set.
+     */
+    public void setDestinationPolicy(DestinationPolicyType destinationPolicy) {
+        this.destinationPolicy = destinationPolicy;
+    }
+
+    /** 
+     * @return Returns the rmAssertion.
+     */
+    public RMAssertion getRMAssertion() {
+        return rmAssertion;
+    }
+
+    /**
+     * @param rma The rmAssertion to set.
+     */
+    public void setRMAssertion(RMAssertion rma) {
+        org.apache.cxf.ws.rm.policy.ObjectFactory factory = new org.apache.cxf.ws.rm.policy.ObjectFactory();
+        if (null == rma) {
+            rma = factory.createRMAssertion();
+            rma.setExponentialBackoff(factory.createRMAssertionExponentialBackoff());
+        }
+        BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
+        if (null == bri) {
+            bri = factory.createRMAssertionBaseRetransmissionInterval();
+            rma.setBaseRetransmissionInterval(bri);
+        }
+        if (null == bri.getMilliseconds()) {
+            bri.setMilliseconds(new BigInteger(RetransmissionQueue.DEFAULT_BASE_RETRANSMISSION_INTERVAL));
+        }
+
+        rmAssertion = rma;
+    }
+
+    /** 
+     * @return Returns the sourcePolicy.
+     */
+    public SourcePolicyType getSourcePolicy() {
+        return sourcePolicy;
+    }
+    
+    /**
+     * @param sp The sourcePolicy to set.
+     */
+    public void setSourcePolicy(SourcePolicyType sp) {
+        org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
+        if (null == sp) {
+            sp = factory.createSourcePolicyType();
+        }
+        if (!sp.isSetSequenceTerminationPolicy()) {
+            sp.setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
+        }
+        sourcePolicy = sp;
+    }
+    
+    // The real stuff ...
 
     public synchronized RMEndpoint getReliableEndpoint(Message message) {
         Endpoint endpoint = message.getExchange().get(Endpoint.class);
@@ -149,7 +245,7 @@
         }
         RMEndpoint rme = reliableEndpoints.get(endpoint);
         if (null == rme) {
-            rme = createReliableEndpoint(this, endpoint);
+            rme = createReliableEndpoint(endpoint);
             org.apache.cxf.transport.Destination destination = message.getExchange().getDestination();
             org.apache.cxf.ws.addressing.EndpointReferenceType replyTo = null;
             if (null != destination) {
@@ -232,10 +328,11 @@
 
     @PreDestroy
     public void shutdown() {
-        
         // shutdown remaining endpoints 
         
-        for (RMEndpoint rme : reliableEndpoints.values()) {
+        LOG.log(Level.FINE, "Shutting down RMManager with {0} remaining endpoints.",
+                reliableEndpoints.size());
+        for (RMEndpoint rme : reliableEndpoints.values()) {            
             rme.shutdown();
         }
 
@@ -261,12 +358,72 @@
         reliableEndpoints.remove(e);
     }
     
-    RMEndpoint createReliableEndpoint(RMManager manager, Endpoint endpoint) {
-        return new RMEndpoint(manager, endpoint);
+    void recoverReliableEndpoint(Endpoint endpoint, Conduit conduit) {
+        if (null == store || null == retransmissionQueue) {
+            return;
+        }
+        
+        RMEndpoint rme = createReliableEndpoint(endpoint);        
+        rme.initialise(conduit, null);
+        reliableEndpoints.put(endpoint, rme);
+        
+        String id = RMUtils.getEndpointIdentifier(endpoint);
+        LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
+                new Object[] {null == conduit ? "client" : "server", id});
+        Collection<SourceSequence> sss = store.getSourceSequences(id);
+        if (null == sss || 0 == sss.size()) {                        
+            return;
+        }
+        LOG.log(Level.FINE, "Number of source sequences: {0}", sss.size());
+        for (SourceSequence ss : sss) {
+            rme.getSource().addSequence(ss, false);
+ 
+            Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
+            if (null == ms) {
+                continue;
+            }
+            LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
+            for (RMMessage m : ms) {
+                
+                Message message = new MessageImpl();
+                Exchange exchange = new ExchangeImpl();
+                message.setExchange(exchange);
+                if (null != conduit) {
+                    exchange.setConduit(conduit);
+                    message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
+                }
+                exchange.put(Endpoint.class, endpoint);
+                exchange.put(Service.class, endpoint.getService());
+                if (endpoint.getEndpointInfo().getService() != null) {
+                    exchange.put(ServiceInfo.class, endpoint.getEndpointInfo().getService());
+                    exchange.put(InterfaceInfo.class, endpoint.getEndpointInfo().getService().getInterface());
+                }
+                exchange.put(Binding.class, endpoint.getBinding());
+                exchange.put(BindingInfo.class, endpoint.getEndpointInfo().getBinding());
+                exchange.put(Bus.class, bus);
+                
+                SequenceType st = RMUtils.getWSRMFactory().createSequenceType();
+                st.setIdentifier(ss.getIdentifier());
+                st.setMessageNumber(m.getMessageNumber());
+                if (ss.isLastMessage() && ss.getCurrentMessageNr().equals(m.getMessageNumber())) {
+                    st.setLastMessage(RMUtils.getWSRMFactory().createSequenceTypeLastMessage());
+                }
+                RMProperties rmps = new RMProperties();
+                rmps.setSequence(st);
+                RMContextUtils.storeRMProperties(message, rmps, true);
+                                    
+                message.setContent(byte[].class, m.getContent());
+                          
+                retransmissionQueue.addUnacknowledged(message);
+            }
+        }
+        retransmissionQueue.start();
+        
     }
     
-    // configuration
-    
+    RMEndpoint createReliableEndpoint(Endpoint endpoint) {
+        return new RMEndpoint(this, endpoint);
+    }  
    
     @PostConstruct
     void initialise() {
@@ -295,6 +452,21 @@
             idGenerator = new DefaultSequenceIdentifierGenerator();
         }
     }
+    
+    @PostConstruct
+    void registerListeners() {
+        if (null == bus) { 
+            return;
+        }
+        ServerLifeCycleManager slm = bus.getExtension(ServerLifeCycleManager.class);
+        if (null != slm) {
+            slm.registerListener(this);
+        }
+        ClientLifeCycleManager clm = bus.getExtension(ClientLifeCycleManager.class);
+        if (null != clm) {
+            clm.registerListener(this);
+        }
+    }
 
    
     Map<Endpoint, RMEndpoint> getReliableEndpointsMap() {
@@ -315,81 +487,6 @@
         }
     }
 
-    /**  
-     * @return Returns the deliveryAssurance.
-     */
-    public DeliveryAssuranceType getDeliveryAssurance() {
-        return deliveryAssurance;
-    }
-
-    /**
-     * @param deliveryAssurance The deliveryAssurance to set.
-     */
-    public void setDeliveryAssurance(DeliveryAssuranceType deliveryAssurance) {
-        this.deliveryAssurance = deliveryAssurance;
-    }
-
-    /**
-     * @return Returns the destinationPolicy.
-     */
-    public DestinationPolicyType getDestinationPolicy() {
-        return destinationPolicy;
-    }
-
-    /**
-     * @param destinationPolicy The destinationPolicy to set.
-     */
-    public void setDestinationPolicy(DestinationPolicyType destinationPolicy) {
-        this.destinationPolicy = destinationPolicy;
-    }
-
-    /** 
-     * @return Returns the rmAssertion.
-     */
-    public RMAssertion getRMAssertion() {
-        return rmAssertion;
-    }
-
-    /**
-     * @param rma The rmAssertion to set.
-     */
-    public void setRMAssertion(RMAssertion rma) {
-        org.apache.cxf.ws.rm.policy.ObjectFactory factory = new org.apache.cxf.ws.rm.policy.ObjectFactory();
-        if (null == rma) {
-            rma = factory.createRMAssertion();
-            rma.setExponentialBackoff(factory.createRMAssertionExponentialBackoff());
-        }
-        BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
-        if (null == bri) {
-            bri = factory.createRMAssertionBaseRetransmissionInterval();
-            rma.setBaseRetransmissionInterval(bri);
-        }
-        if (null == bri.getMilliseconds()) {
-            bri.setMilliseconds(new BigInteger(RetransmissionQueue.DEFAULT_BASE_RETRANSMISSION_INTERVAL));
-        }
-
-        rmAssertion = rma;
-    }
-
-    /** 
-     * @return Returns the sourcePolicy.
-     */
-    public SourcePolicyType getSourcePolicy() {
-        return sourcePolicy;
-    }
     
-    /**
-     * @param sp The sourcePolicy to set.
-     */
-    public void setSourcePolicy(SourcePolicyType sp) {
-        org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
-        if (null == sp) {
-            sp = factory.createSourcePolicyType();
-        }
-        if (!sp.isSetSequenceTerminationPolicy()) {
-            sp.setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
-        }
-        sourcePolicy = sp;
-    }
 
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Thu May 24 10:33:11 2007
@@ -21,6 +21,7 @@
 
 import java.math.BigInteger;
 import java.util.Collection;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -112,7 +113,6 @@
         
         if ((isApplicationMessage || isLastMessage)
             && !isPartialResponse) {
-      
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue()));
             }
@@ -120,7 +120,13 @@
             // get the current sequence, requesting the creation of a new one if necessary
             
             synchronized (source) {
-                SourceSequence seq = getManager().getSequence(inSeqId, message, maps);
+                SourceSequence seq = null;
+                if (isLastMessage) {
+                    Map<?, ?> invocationContext = (Map)message.get(Message.INVOCATION_CONTEXT);
+                    seq = (SourceSequence)invocationContext.get(SourceSequence.class.getName());
+                } else {
+                    seq = getManager().getSequence(inSeqId, message, maps);
+                }
                 assert null != seq;
 
                 // increase message number and store a sequence type object in

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java Thu May 24 10:33:11 2007
@@ -19,6 +19,9 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.text.MessageFormat;
+
+import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.ws.addressing.AddressingConstants;
 import org.apache.cxf.ws.addressing.AddressingConstantsImpl;
 import org.apache.cxf.ws.addressing.VersionTransformer;
@@ -87,4 +90,11 @@
         epr.setAddress(uri);
         return epr;
     } 
+    
+    public static String getEndpointIdentifier(Endpoint endpoint) {
+        return MessageFormat.format("{0}.{1}", new Object[] {
+            endpoint.getEndpointInfo().getService().getName(),
+            endpoint.getEndpointInfo().getName()
+        });
+    }
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Thu May 24 10:33:11 2007
@@ -19,8 +19,6 @@
 
 package org.apache.cxf.ws.rm;
 
-import java.util.Collection;
-
 import org.apache.cxf.message.Message;
 
 public interface RetransmissionQueue {
@@ -39,10 +37,9 @@
      * @return true if there are no unacknowledged messages in the queue
      */
     boolean isEmpty();
-   
+    
     /**
-     * Accepts a new context for posible future retransmission.
-     * 
+     * Accepts a new context for posible future retransmission. 
      * @param ctx the message context.
      */
     void addUnacknowledged(Message message);
@@ -64,12 +61,6 @@
      * Stops retransmission queue.
      */
     void stop(SourceSequence seq);
-    
-    /**
-     * Populates the retransmission queue with messages recovered from
-     * persistent store.
-     */
-    void populate(Collection<SourceSequence> sss);
     
     
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java Thu May 24 10:33:11 2007
@@ -58,6 +58,9 @@
             LOG.fine("No operation info."); 
             return null;
         }
+        
+        // TODO: throw Fault, see AbstractRMInterceptor
+        
         if (RMConstants.getCreateSequenceOperationName().equals(oi.getName())
             || RMConstants.getCreateSequenceOnewayOperationName().equals(oi.getName())) {
             try {
@@ -73,6 +76,14 @@
             try {
                 createSequenceResponse(createResponse);
             } catch (SequenceFault ex) {
+                ex.printStackTrace();
+            }
+        } else if (RMConstants.getTerminateSequenceOperationName().equals(oi.getName())) {            
+            try {
+                terminateSequence(exchange.getInMessage());
+            } catch (SequenceFault ex) {
+                ex.printStackTrace();
+            } catch (RMException ex) {
                 ex.printStackTrace();
             }
         }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Thu May 24 10:33:11 2007
@@ -107,10 +107,6 @@
         return 0 == getUnacknowledged().size();
     }
 
-    public void populate(Collection<SourceSequence> sss) {
-        // TODO Auto-generated method stub
-    }
-
     /**
      * Purge all candidates for the given sequence that have been acknowledged.
      * 
@@ -199,14 +195,16 @@
      * 
      * @param ctx the message context.
      * @return ResendCandidate
-     */
+     */    
     protected ResendCandidate cacheUnacknowledged(Message message) {
-        ResendCandidate candidate = null;
         RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);
         SequenceType st = rmps.getSequence();
         Identifier sid = st.getIdentifier();
+        String key = sid.getValue();
+        
+        ResendCandidate candidate = null;
+        
         synchronized (this) {
-            String key = sid.getValue();
             List<ResendCandidate> sequenceCandidates = getSequenceCandidates(key);
             if (null == sequenceCandidates) {
                 sequenceCandidates = new ArrayList<ResendCandidate>();
@@ -327,7 +325,15 @@
             }
             ByteArrayOutputStream savedOutputStream = (ByteArrayOutputStream)message
                 .get(RMMessageConstants.SAVED_OUTPUT_STREAM);
-            ByteArrayInputStream bis = new ByteArrayInputStream(savedOutputStream.toByteArray());
+            byte[] content = null;
+            if (null == savedOutputStream) {                
+                content = message.getContent(byte[].class); 
+                LOG.fine("Using saved byte array: " + content);
+            } else {
+                content = savedOutputStream.toByteArray();
+                LOG.fine("Using saved output stream: " + savedOutputStream);
+            }
+            ByteArrayInputStream bis = new ByteArrayInputStream(content);
 
             // copy saved output stream to new output stream in chunks of 1024
             AbstractCachedOutputStream.copyStream(bis, os, 1024);

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java Thu May 24 10:33:11 2007
@@ -48,15 +48,12 @@
     
     @Test
     public void testAccessors() {
-        String n = "abc";
-        EasyMock.expect(rme.getName()).andReturn(n);
         Endpoint ae = control.createMock(Endpoint.class);
         EasyMock.expect(rme.getApplicationEndpoint()).andReturn(ae);
         RMManager mgr = control.createMock(RMManager.class);
         EasyMock.expect(rme.getManager()).andReturn(mgr);
         control.replay();
         AbstractEndpoint tested = new AbstractEndpoint(rme);
-        assertSame(n, tested.getName());
         assertSame(rme, tested.getReliableEndpoint());
         assertSame(ae, tested.getEndpoint());
         assertSame(mgr, tested.getManager());

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java Thu May 24 10:33:11 2007
@@ -164,7 +164,7 @@
         EasyMock.expect(si.getInterface()).andReturn(ii);
         OperationInfo oi = control.createMock(OperationInfo.class);
         EasyMock.expect(ii.getOperation(RMConstants.getLastMessageOperationName())).andReturn(oi);
-        expectInvoke(proxy, oi, null);
+        expectInvokeWithContext(proxy, oi, null);
         control.replay();
         
         proxy.lastMessage(ss);
@@ -350,5 +350,12 @@
     private void expectInvoke(Proxy proxy, OperationInfo oi, Object expectedReturn) throws RMException {
         EasyMock.expect(proxy.invoke(EasyMock.same(oi), EasyMock.isA(Object[].class), 
             (Map)EasyMock.isNull())).andReturn(expectedReturn);
+    }
+    
+    @SuppressWarnings("unchecked")
+    private void expectInvokeWithContext(Proxy proxy, OperationInfo oi, Object expectedReturn) 
+        throws RMException {
+        EasyMock.expect(proxy.invoke(EasyMock.same(oi), EasyMock.isA(Object[].class), 
+            EasyMock.isA(Map.class))).andReturn(expectedReturn);
     }
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java Thu May 24 10:33:11 2007
@@ -84,20 +84,6 @@
     }
 
     @Test
-    public void testGetName() {
-        EndpointInfo aei = control.createMock(EndpointInfo.class);
-        EasyMock.expect(ae.getEndpointInfo()).andReturn(aei).times(2);
-        QName eqn = new QName("ns2", "endpoint");
-        EasyMock.expect(aei.getName()).andReturn(eqn);
-        ServiceInfo asi = control.createMock(ServiceInfo.class);
-        EasyMock.expect(aei.getService()).andReturn(asi);
-        QName sqn = new QName("ns1", "service");
-        EasyMock.expect(asi.getName()).andReturn(sqn);
-        control.replay();
-        assertEquals("{ns1}service.{ns2}endpoint", rme.getName());
-    }
-
-    @Test
     public void testGetManager() {
         control.replay();
         assertSame(manager, rme.getManager());

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Thu May 24 10:33:11 2007
@@ -21,21 +21,29 @@
 
 import java.lang.reflect.Method;
 import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.TimerTask;
 
 import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.binding.Binding;
 import org.apache.cxf.binding.soap.SoapBinding;
 import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.endpoint.Server;
-import org.apache.cxf.endpoint.ServerLifeCycleListener;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.service.Service;
+import org.apache.cxf.service.model.BindingInfo;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.service.model.InterfaceInfo;
+import org.apache.cxf.service.model.ServiceInfo;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
@@ -45,6 +53,7 @@
 import org.apache.cxf.ws.addressing.RelatesToType;
 import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.persistence.RMMessage;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.policy.RMAssertion;
 import org.easymock.classextension.EasyMock;
@@ -55,22 +64,23 @@
 
 public class RMManagerTest extends Assert {
     
-    private IMocksControl control;
+    private MyControl control;
+    private RMManager manager;
     
     @Before
     public void setUp() {
-        control = EasyMock.createNiceControl();
+        // control = EasyMock.createNiceControl();
+        control = new MyControl();
     }
    
     @Test
     public void testAccessors() {
-        RMManager manager = new RMManager();
+        manager = new RMManager();
         assertNull(manager.getBus());
         assertNull(manager.getStore());
         assertNull(manager.getRetransmissionQueue());
         assertNotNull(manager.getTimer());
-        assertNotNull(manager.getServerLifeCycleListener());
-        
+
         Bus bus = control.createMock(Bus.class);
         RMStore store = control.createMock(RMStore.class);
         RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
@@ -83,12 +93,11 @@
         assertSame(queue, manager.getRetransmissionQueue());
         control.replay();
         control.verify();
-        
     }
     
     @Test
     public void testInitialisation() {
-        RMManager manager = new RMManager();
+        manager = new RMManager();
         assertNull("RMAssertion is set.", manager.getRMAssertion());
         assertNull("sourcePolicy is set.", manager.getSourcePolicy());
         assertNull("destinationPolicy is set.", manager.getDestinationPolicy());
@@ -120,25 +129,64 @@
     } 
     
     @Test
-    public void testServerLifecycleLister() {
-        RMManager manager = new RMManager();
+    public void testStartServer() throws NoSuchMethodException {
+        Method m = RMManager.class
+            .getDeclaredMethod("recoverReliableEndpoint", new Class[] {Endpoint.class, Conduit.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         Server s = control.createMock(Server.class);
         Endpoint e = control.createMock(Endpoint.class);
         EasyMock.expect(s.getEndpoint()).andReturn(e);
+        manager.recoverReliableEndpoint(e, null);
+        EasyMock.expectLastCall();
         control.replay();
-        ServerLifeCycleListener sl = manager.getServerLifeCycleListener();
-        sl.startServer(s);
-        sl.stopServer(s);
+        manager.startServer(s);
         control.verify();
-        
-        control.reset();
+    }
+    
+    @Test
+    public void testStopServer() throws NoSuchMethodException {
+        Method m = RMManager.class
+            .getDeclaredMethod("shutdownReliableEndpoint", new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
+        Server s = control.createMock(Server.class);
+        Endpoint e = control.createMock(Endpoint.class);
         EasyMock.expect(s.getEndpoint()).andReturn(e);
-        RMEndpoint rme = control.createMock(RMEndpoint.class);
-        manager.getReliableEndpointsMap().put(e, rme);
-        rme.shutdown();
+        manager.shutdownReliableEndpoint(e);
         EasyMock.expectLastCall();
         control.replay();
-        sl.stopServer(s);
+        manager.stopServer(s);
+        control.verify();
+    }
+    
+    @Test
+    public void testClientCreated() throws NoSuchMethodException {
+        Method m = RMManager.class
+            .getDeclaredMethod("recoverReliableEndpoint", new Class[] {Endpoint.class, Conduit.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
+        Client client = control.createMock(Client.class);
+        Endpoint endpoint = control.createMock(Endpoint.class);
+        EasyMock.expect(client.getEndpoint()).andReturn(endpoint);
+        Conduit conduit = control.createMock(Conduit.class);
+        EasyMock.expect(client.getConduit()).andReturn(conduit);
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        EasyMock.expectLastCall();
+        control.replay();
+        manager.clientCreated(client);
+        control.verify();
+    }
+    
+    @Test
+    public void testClientDestroyed() throws NoSuchMethodException {
+        Method m = RMManager.class
+            .getDeclaredMethod("shutdownReliableEndpoint", new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
+        Client c = control.createMock(Client.class);
+        Endpoint e = control.createMock(Endpoint.class);
+        EasyMock.expect(c.getEndpoint()).andReturn(e);
+        manager.shutdownReliableEndpoint(e);
+        EasyMock.expectLastCall();
+        control.replay();
+        manager.clientDestroyed(c);
         control.verify();
     }
     
@@ -151,8 +199,8 @@
     @Test
     public void testGetReliableEndpointServerSideCreate() throws NoSuchMethodException {
         Method m = RMManager.class.getDeclaredMethod("createReliableEndpoint", 
-            new Class[] {RMManager.class, Endpoint.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+            new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
         Message message = control.createMock(Message.class);
         Exchange exchange = control.createMock(Exchange.class);
@@ -166,7 +214,7 @@
         Endpoint e = control.createMock(Endpoint.class);
         EasyMock.expect(wre.getWrappedEndpoint()).andReturn(e);        
         RMEndpoint rme = control.createMock(RMEndpoint.class);
-        EasyMock.expect(manager.createReliableEndpoint(manager, e)).andReturn(rme);
+        EasyMock.expect(manager.createReliableEndpoint(e)).andReturn(rme);
         org.apache.cxf.transport.Destination destination = control
             .createMock(org.apache.cxf.transport.Destination.class);
         EasyMock.expect(exchange.getDestination()).andReturn(destination);
@@ -198,8 +246,8 @@
     @Test
     public void testGetReliableEndpointClientSideCreate() throws NoSuchMethodException {
         Method m = RMManager.class.getDeclaredMethod("createReliableEndpoint", 
-            new Class[] {RMManager.class, Endpoint.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+            new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
         Message message = control.createMock(Message.class);
         Exchange exchange = control.createMock(Exchange.class);
@@ -211,7 +259,7 @@
         QName name = new QName("http://x.y.z/a", "GreeterPort");
         EasyMock.expect(ei.getName()).andReturn(name);
         RMEndpoint rme = control.createMock(RMEndpoint.class);
-        EasyMock.expect(manager.createReliableEndpoint(manager, endpoint)).andReturn(rme);
+        EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme);
         EasyMock.expect(exchange.getDestination()).andReturn(null);
         Conduit conduit = control.createMock(Conduit.class);
         EasyMock.expect(exchange.getConduit(message)).andReturn(conduit);
@@ -236,8 +284,8 @@
     @Test
     public void testGetReliableEndpointExisting() throws NoSuchMethodException {
         Method m = RMManager.class.getDeclaredMethod("createReliableEndpoint", 
-            new Class[] {RMManager.class, Endpoint.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+            new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
         Message message = control.createMock(Message.class);
         Exchange exchange = control.createMock(Exchange.class);
@@ -260,7 +308,7 @@
     public void testGetDestination() throws NoSuchMethodException {
         Method  m = RMManager.class
             .getDeclaredMethod("getReliableEndpoint", new Class[] {Message.class});        
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         Message message = control.createMock(Message.class);
         RMEndpoint rme = control.createMock(RMEndpoint.class);
         EasyMock.expect(manager.getReliableEndpoint(message)).andReturn(rme);    
@@ -282,7 +330,7 @@
     public void testGetSource() throws NoSuchMethodException {
         Method m = RMManager.class
             .getDeclaredMethod("getReliableEndpoint", new Class[] {Message.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         Message message = control.createMock(Message.class);
         RMEndpoint rme = control.createMock(RMEndpoint.class);
         EasyMock.expect(manager.getReliableEndpoint(message)).andReturn(rme);
@@ -304,7 +352,7 @@
     public void testGetExistingSequence() throws NoSuchMethodException, SequenceFault, RMException {
         Method m = RMManager.class
            .getDeclaredMethod("getSource", new Class[] {Message.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         Message message = control.createMock(Message.class);
         Identifier inSid = control.createMock(Identifier.class);
         
@@ -320,7 +368,7 @@
     @Test
     public void testGetNewSequence() throws NoSuchMethodException, SequenceFault, RMException {
         Method m = RMManager.class.getDeclaredMethod("getSource", new Class[] {Message.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         Message message = control.createMock(Message.class);
         Exchange exchange = control.createMock(Exchange.class);
         EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
@@ -366,7 +414,7 @@
     @Test
     public void testShutdown() {
         Bus bus = new SpringBusFactory().createBus("org/apache/cxf/ws/rm/rmmanager.xml", false);
-        RMManager manager = bus.getExtension(RMManager.class);        
+        manager = bus.getExtension(RMManager.class);        
         Endpoint e = control.createMock(Endpoint.class);
         RMEndpoint rme = control.createMock(RMEndpoint.class);
         manager.getReliableEndpointsMap().put(e, rme);
@@ -385,11 +433,12 @@
         } catch (IllegalStateException ex) {
             // expected
         }
+        control.verify();
     }
     
     @Test
     public void testShutdownReliableEndpoint() {
-        RMManager manager = new RMManager();
+        manager = new RMManager();
         Endpoint e = control.createMock(Endpoint.class);
         RMEndpoint rme = control.createMock(RMEndpoint.class);
         control.replay();
@@ -402,12 +451,131 @@
         EasyMock.expectLastCall();
         control.replay();
         manager.shutdownReliableEndpoint(e);
-        assertNull(manager.getReliableEndpointsMap().get(e));        
+        assertNull(manager.getReliableEndpointsMap().get(e));  
+        control.verify();
+    }
+    
+    @Test
+    public void testRecoverReliableEndpoint() {
+        manager = new RMManager();
+        Endpoint endpoint = control.createMock(Endpoint.class);
+        Conduit conduit = control.createMock(Conduit.class);
+                
+        control.replay();
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        control.verify();
+        
+        control.reset();
+        
+        RMStore store = control.createMock(RMStore.class);
+        manager.setStore(store);
+       
+        control.replay();
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        control.verify();           
+    }
+    
+    @Test
+    public void testRecoverReliableClientEndpoint() throws NoSuchMethodException {
+        Method method = RMManager.class.getDeclaredMethod("createReliableEndpoint", 
+                                                          new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {method});
+        Endpoint endpoint = control.createMock(Endpoint.class);
+        EndpointInfo ei = control.createMock(EndpointInfo.class);
+        ServiceInfo si = control.createMock(ServiceInfo.class);  
+        BindingInfo bi = control.createMock(BindingInfo.class);
+        InterfaceInfo ii = control.createMock(InterfaceInfo.class);
+        setUpEndpointForRecovery(endpoint, ei, si, bi, ii);          
+        Conduit conduit = control.createMock(Conduit.class);        
+        setUpRecoverReliableEndpoint(endpoint, conduit, null, null);
+        control.replay();
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        control.verify();
+        
+        control.reset();
+        setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
+        SourceSequence ss = control.createMock(SourceSequence.class);
+        setUpRecoverReliableEndpoint(endpoint, conduit, ss, null);
+        control.replay();
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        control.verify();
+        
+        control.reset();
+        setUpEndpointForRecovery(endpoint, ei, si, bi, ii);  
+        RMMessage m = control.createMock(RMMessage.class);
+        setUpRecoverReliableEndpoint(endpoint, conduit, ss, m);        
+        control.replay();
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        control.verify();        
+    }
+    
+    Endpoint setUpEndpointForRecovery(Endpoint endpoint, 
+                                      EndpointInfo ei, 
+                                    ServiceInfo si,
+                                    BindingInfo bi,
+                                    InterfaceInfo ii) {   
+        EasyMock.expect(endpoint.getEndpointInfo()).andReturn(ei).anyTimes();     
+        EasyMock.expect(ei.getService()).andReturn(si).anyTimes();
+        EasyMock.expect(si.getName()).andReturn(new QName("S", "s")).anyTimes();
+        EasyMock.expect(ei.getName()).andReturn(new QName("P", "p")).anyTimes();
+        EasyMock.expect(si.getInterface()).andReturn(ii).anyTimes();
+        return endpoint;
+    }
+    
+    void setUpRecoverReliableEndpoint(Endpoint endpoint,
+                                      Conduit conduit, 
+                                      SourceSequence ss, 
+                                      RMMessage m)  {                
+        RMStore store = control.createMock(RMStore.class);
+        RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
+        manager.setStore(store);
+        manager.setRetransmissionQueue(queue);
+        manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
+        RMEndpoint rme = control.createMock(RMEndpoint.class);
+        EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme);
+        Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
+        if (null != ss) {
+            sss.add(ss);            
+        }
+        EasyMock.expect(store.getSourceSequences("{S}s.{P}p")).andReturn(sss);
+        if (null == ss) {
+            return;
+        } 
+        Source source = control.createMock(Source.class);
+        EasyMock.expect(rme.getSource()).andReturn(source);
+        source.addSequence(ss, false);
+        EasyMock.expectLastCall();
+        
+        Collection<RMMessage> ms = new ArrayList<RMMessage>();
+        if (null != m) {
+            ms.add(m);
+        }
+        Identifier id = new Identifier();
+        id.setValue("S1");
+        EasyMock.expect(ss.getIdentifier()).andReturn(id).times(null == m ? 1 : 2);
+        EasyMock.expect(store.getMessages(id, true)).andReturn(ms);
+        if (null == m) {
+            return;
+        }
+        Service service = control.createMock(Service.class);
+        EasyMock.expect(endpoint.getService()).andReturn(service);
+        Binding binding = control.createMock(Binding.class);
+        EasyMock.expect(endpoint.getBinding()).andReturn(binding);
+       
+        EasyMock.expect(ss.isLastMessage()).andReturn(true);
+        EasyMock.expect(ss.getCurrentMessageNr()).andReturn(BigInteger.TEN);
+        EasyMock.expect(m.getMessageNumber()).andReturn(BigInteger.TEN).times(2);
+        byte[] content = new byte[] {'x', '9'};
+        EasyMock.expect(m.getContent()).andReturn(content);
+        queue.addUnacknowledged(EasyMock.isA(Message.class));
+        EasyMock.expectLastCall();
+        queue.start();
+        EasyMock.expectLastCall(); 
     }
     
     @Test
     public void testDefaultSequenceIdentifierGenerator() {
-        RMManager manager = new RMManager();
+        manager = new RMManager();
         assertNull(manager.getIdGenerator());
         SequenceIdentifierGenerator generator = manager.new DefaultSequenceIdentifierGenerator();
         manager.setIdGenerator(generator);
@@ -419,7 +587,41 @@
         assertTrue(id1 != id2);
         assertTrue(!id1.getValue().equals(id2.getValue()));     
         control.replay();
-    }
+    }   
     
-     
+    class MyControl {
+        private IMocksControl c;
+        private List<Object> mocks;
+        
+        MyControl() {
+            c = EasyMock.createNiceControl();
+            mocks = new ArrayList<Object>();
+        }
+        
+        void replay() {
+            c.replay();
+        }
+        
+        void reset() {
+            c.reset();
+        }
+        
+        void verify() {
+            c.verify();
+        }
+        
+        <T> T createMock(Class<T> cls) {
+            T mock = c.createMock(cls);
+            mocks.add(mock);
+            return mock;
+        }
+        
+        <T> T createMock(Class<T> cls, Method[] m) {
+            T mock = c.createMock(cls, m);
+            mocks.add(mock);
+            return mock;
+        }
+        
+         
+    }
 } 



Re: svn commit: r541364 [1/2] - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/endpoint/ rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/core/src/main/resources/META-INF/c...

Posted by Daniel Kulp <dk...@apache.org>.
Andrea,

If I'm reading this code correctly.....

The ClientLifeCycleManager holds on to all clients with strong 
references.  The only way to release it is to call "destroy()" on the 
client.   

That isn't a "normal" thing to do in a JAX-WS use case.   Thus, for 
normal JAX-WS development, all clients ever created are held onto 
forever.   All the resources they contain are never released.   Etc....

Am I reading that correctly?   If so, -1.   That needs to be fixed.


Dan



On Thursday 24 May 2007 13:33, andreasmyth@apache.org wrote:
> Author: andreasmyth
> Date: Thu May 24 10:33:11 2007
> New Revision: 541364
>
> URL: http://svn.apache.org/viewvc?view=rev&rev=541364
> Log:
> [JIRA CXF-139] Client-side recovery.
> Added client lifecycle management interface to be notified of client
> creation so that recovery can take place before client sends any
> requests. Fixed minor bug in handling TerminateSequence invocation.
>
> Added:
>    
> incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLi
>feCycleListener.java   (with props)
> incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLi
>feCycleManager.java   (with props)
> incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Clie
>ntLifeCycleManagerImpl.java   (with props)
> incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMUtil
>sTest.java   (with props)
> incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/r
>m/ClientPersistenceTest.java   (with props) Removed:
>    
> incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/r
>m/PersistenceTest.java Modified:
>    
> incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.j
>ava
> incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/bind
>ing/soap/interceptor/SoapHeaderInterceptor.java
> incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Clie
>ntImpl.java
> incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Serv
>erLifeCycleManagerImpl.java
> incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml
> incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Abstra
>ctEndpoint.java
> incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.
>java
> incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCont
>extUtils.java
> incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndp
>oint.java
> incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMana
>ger.java
> incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutI
>nterceptor.java
> incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtil
>s.java
> incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Retran
>smissionQueue.java
> incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servan
>t.java
> incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/R
>etransmissionQueueImpl.java
> incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/Abstra
>ctEndpointTest.java
> incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyT
>est.java
> incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndp
>ointTest.java
> incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMMana
>gerTest.java
> incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/r
>m/oneway-client-crash.xml
>

-- 
J. Daniel Kulp
Principal Engineer
IONA
P: 781-902-8727    C: 508-380-7194
daniel.kulp@iona.com
http://www.dankulp.com/blog

Re: svn commit: r541364 [1/2] - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/endpoint/ rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/core/src/main/resources/META-INF/c...

Posted by Daniel Kulp <dk...@apache.org>.
On Thursday 24 May 2007 17:54, Andrea Smyth wrote:
> Yes, and this can be improved upon slightly by creating the RMEndpoint
> only when there actually are messages to recover and resend. Otherwise
> the creation of the RMEndpoint can be deferred until the client makes
> its first request. Normally that should not make much of a difference
> though.
> If there is concern about the growth of the reliableEndpoint map in an
> application with many shortlived clients using RM, 

I'm more concerned about the case where the application has one client 
using RM (so the store/etc... is created), and a BUNCH of other short 
lived clients not using RM.    

Right now, it looks like all the other short lived clients end up in that 
map where they are held onto forever. 

> I would recommend 
> that the application tells the runtime when it's done with a
> particular client as the RMEndpoint object will then be removed from
> the map.

My point is that from a pure JAX-WS application, there isn't a way to do 
that.   By default, it cannot hold onto anything strongly.


> >It looks like for normal cases (RM not used), store is null so it's
> > not a problem.
> >
> >However, my major issue, I guess, is the API kind of implies that
> >the "ClientLifeCycleManager.clientDestroyed(...)" method will always
> > be called.   I just don't see that as being true.   I actually
> > expect it to rarely be called.
>
> It's only called when  the application explicitly informs the runtime
> that a is destroyed. In connection with RM users may want  to
> terminate the client's current sequence when it is clear that no more
> messages will be sent in the context of this sequence.
> Also, clients that use non-anonymous ReplyTo, with or without RM, may
> want to indicate that the ReplyTo, AcksTo listener on the client side
> can be shut down.
> If you prefer other names for the APIs I can of course change them.

No, the name is fine.   However, it needs some strong javadocs that warn 
against holding onto stuff using strong references and also warning that 
clientDestroyed is likely to not be called at all.

Honestly, I think the RM stuff should change to using WeakReferences and 
a ReferenceQueue for almost everything.   When the client is garbage 
collected, it can then clean things up.    Other 
ClientLifeCycleListeners should be encouraged to do the same thing.

Looking at the code, RM just uses the client to get the endpoint.  Thus, 
you probably could do something like:
Map<WeakReference<Client>, Endpoint>....

when the client is GC'd, you could still use the endpoint to clean things 
up.   Slightly tricky, but certainly more reliable than expecting users 
to call a non-existent (according to spec) destroy method. 

The other option is to put finalize methods on the clients, but I hate 
doing that cause finalize methods KILL GC performance.


-- 
J. Daniel Kulp
Principal Engineer
IONA
P: 781-902-8727    C: 508-380-7194
daniel.kulp@iona.com
http://www.dankulp.com/blog

Re: svn commit: r541364 [1/2] - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/endpoint/ rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/core/src/main/resources/META-INF/c...

Posted by Andrea Smyth <an...@iona.com>.
Daniel Kulp wrote:

>On Thursday 24 May 2007 16:15, Andrea Smyth wrote:
>  
>
>>>If I'm reading this code correctly.....
>>>
>>>The ClientLifeCycleManager holds on to all clients with strong
>>>references.  The only way to release it is to call "destroy()" on the
>>>client.
>>>      
>>>
>>The ClientLifeCycleManager (implementation) holds on to all client
>>lifecycle *listeners*, not the clients.
>>What exactly are yoy referring to?
>>    
>>
>
>Sorry.   My bad.
>
>However, in the RMManager, all clients that are created when RMManager is 
>registered as a ClientLifeCycleListener go into 
>RMManger.recoverReliableEndpoint(...).   If there is a store and 
>recoverReliableEndpoint, then for all clients, a RMEndpoint object is 
>created and the endpoint and RMEndpoint objects are put in the 
>reliableEndpoints map, which is a strong map.
>
>Right?
>  
>
Yes, and this can be improved upon slightly by creating the RMEndpoint 
only when there actually are messages to recover and resend. Otherwise 
the creation of the RMEndpoint can be deferred until the client makes 
its first request. Normally that should not make much of a difference 
though.
If there is concern about the growth of the reliableEndpoint map in an 
application with many shortlived clients using RM, I would recommend 
that the application tells the runtime when it's done with a particular 
client as the RMEndpoint object will then be removed from the map.

>It looks like for normal cases (RM not used), store is null so it's not a 
>problem.
>
>However, my major issue, I guess, is the API kind of implies that 
>the "ClientLifeCycleManager.clientDestroyed(...)" method will always be 
>called.   I just don't see that as being true.   I actually expect it to 
>rarely be called.
>  
>

It's only called when  the application explicitly informs the runtime 
that a is destroyed. In connection with RM users may want  to terminate 
the client's current sequence when it is clear that no more messages 
will be sent in the context of this sequence.
Also, clients that use non-anonymous ReplyTo, with or without RM, may 
want to indicate that the ReplyTo, AcksTo listener on the client side 
can be shut down.
If you prefer other names for the APIs I can of course change them.

Andrea.

>Dan
>
>
>
>  
>
>>Andrea.
>>
>>    
>>
>>>That isn't a "normal" thing to do in a JAX-WS use case.   Thus, for
>>>normal JAX-WS development, all clients ever created are held onto
>>>forever.   All the resources they contain are never released.  
>>>Etc....
>>>
>>>Am I reading that correctly?   If so, -1.   That needs to be fixed.
>>>
>>>
>>>Dan
>>>
>>>On Thursday 24 May 2007 13:33, andreasmyth@apache.org wrote:
>>>      
>>>
>>>>Author: andreasmyth
>>>>Date: Thu May 24 10:33:11 2007
>>>>New Revision: 541364
>>>>
>>>>URL: http://svn.apache.org/viewvc?view=rev&rev=541364
>>>>Log:
>>>>[JIRA CXF-139] Client-side recovery.
>>>>Added client lifecycle management interface to be notified of client
>>>>creation so that recovery can take place before client sends any
>>>>requests. Fixed minor bug in handling TerminateSequence invocation.
>>>>
>>>>Added:
>>>>
>>>>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client
>>>>Li feCycleListener.java   (with props)
>>>>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client
>>>>Li feCycleManager.java   (with props)
>>>>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Cl
>>>>ie ntLifeCycleManagerImpl.java   (with props)
>>>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMUt
>>>>il sTest.java   (with props)
>>>>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws
>>>>/r m/ClientPersistenceTest.java   (with props) Removed:
>>>>
>>>>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws
>>>>/r m/PersistenceTest.java Modified:
>>>>
>>>>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client
>>>>.j ava
>>>>incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/bi
>>>>nd ing/soap/interceptor/SoapHeaderInterceptor.java
>>>>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Cl
>>>>ie ntImpl.java
>>>>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Se
>>>>rv erLifeCycleManagerImpl.java
>>>>incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml
>>>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Abst
>>>>ra ctEndpoint.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Prox
>>>>y. java
>>>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCo
>>>>nt extUtils.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEn
>>>>dp oint.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMa
>>>>na ger.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOu
>>>>tI nterceptor.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUt
>>>>il s.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Retr
>>>>an smissionQueue.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Serv
>>>>an t.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap
>>>>/R etransmissionQueueImpl.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/Abst
>>>>ra ctEndpointTest.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/Prox
>>>>yT est.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEn
>>>>dp ointTest.java
>>>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMMa
>>>>na gerTest.java
>>>>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws
>>>>/r m/oneway-client-crash.xml
>>>>        
>>>>
>
>  
>


Re: svn commit: r541364 [1/2] - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/endpoint/ rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/core/src/main/resources/META-INF/c...

Posted by Daniel Kulp <da...@iona.com>.
On Thursday 24 May 2007 16:15, Andrea Smyth wrote:
> >If I'm reading this code correctly.....
> >
> >The ClientLifeCycleManager holds on to all clients with strong
> >references.  The only way to release it is to call "destroy()" on the
> >client.
>
> The ClientLifeCycleManager (implementation) holds on to all client
> lifecycle *listeners*, not the clients.
> What exactly are yoy referring to?

Sorry.   My bad.

However, in the RMManager, all clients that are created when RMManager is 
registered as a ClientLifeCycleListener go into 
RMManger.recoverReliableEndpoint(...).   If there is a store and 
recoverReliableEndpoint, then for all clients, a RMEndpoint object is 
created and the endpoint and RMEndpoint objects are put in the 
reliableEndpoints map, which is a strong map.

Right?

It looks like for normal cases (RM not used), store is null so it's not a 
problem.

However, my major issue, I guess, is the API kind of implies that 
the "ClientLifeCycleManager.clientDestroyed(...)" method will always be 
called.   I just don't see that as being true.   I actually expect it to 
rarely be called.

Dan



>
> Andrea.
>
> >That isn't a "normal" thing to do in a JAX-WS use case.   Thus, for
> >normal JAX-WS development, all clients ever created are held onto
> >forever.   All the resources they contain are never released.  
> > Etc....
> >
> >Am I reading that correctly?   If so, -1.   That needs to be fixed.
> >
> >
> >Dan
> >
> >On Thursday 24 May 2007 13:33, andreasmyth@apache.org wrote:
> >>Author: andreasmyth
> >>Date: Thu May 24 10:33:11 2007
> >>New Revision: 541364
> >>
> >>URL: http://svn.apache.org/viewvc?view=rev&rev=541364
> >>Log:
> >>[JIRA CXF-139] Client-side recovery.
> >>Added client lifecycle management interface to be notified of client
> >>creation so that recovery can take place before client sends any
> >>requests. Fixed minor bug in handling TerminateSequence invocation.
> >>
> >>Added:
> >>
> >>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client
> >>Li feCycleListener.java   (with props)
> >>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client
> >>Li feCycleManager.java   (with props)
> >>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Cl
> >>ie ntLifeCycleManagerImpl.java   (with props)
> >>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMUt
> >>il sTest.java   (with props)
> >>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws
> >>/r m/ClientPersistenceTest.java   (with props) Removed:
> >>
> >>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws
> >>/r m/PersistenceTest.java Modified:
> >>
> >>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client
> >>.j ava
> >>incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/bi
> >>nd ing/soap/interceptor/SoapHeaderInterceptor.java
> >>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Cl
> >>ie ntImpl.java
> >>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Se
> >>rv erLifeCycleManagerImpl.java
> >>incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml
> >>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Abst
> >>ra ctEndpoint.java
> >>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Prox
> >>y. java
> >>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCo
> >>nt extUtils.java
> >>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEn
> >>dp oint.java
> >>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMa
> >>na ger.java
> >>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOu
> >>tI nterceptor.java
> >>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUt
> >>il s.java
> >>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Retr
> >>an smissionQueue.java
> >>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Serv
> >>an t.java
> >>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap
> >>/R etransmissionQueueImpl.java
> >>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/Abst
> >>ra ctEndpointTest.java
> >>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/Prox
> >>yT est.java
> >>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEn
> >>dp ointTest.java
> >>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMMa
> >>na gerTest.java
> >>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws
> >>/r m/oneway-client-crash.xml

-- 
J. Daniel Kulp
Principal Engineer
IONA
P: 781-902-8727    C: 508-380-7194
daniel.kulp@iona.com
http://www.dankulp.com/blog

Re: svn commit: r541364 [1/2] - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/endpoint/ rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/core/src/main/resources/META-INF/c...

Posted by Andrea Smyth <an...@iona.com>.
Daniel Kulp wrote:

>Andrea,
>
>If I'm reading this code correctly.....
>
>The ClientLifeCycleManager holds on to all clients with strong 
>references.  The only way to release it is to call "destroy()" on the 
>client.   
>  
>
The ClientLifeCycleManager (implementation) holds on to all client 
lifecycle *listeners*, not the clients.
What exactly are yoy referring to?

Andrea.

>That isn't a "normal" thing to do in a JAX-WS use case.   Thus, for 
>normal JAX-WS development, all clients ever created are held onto 
>forever.   All the resources they contain are never released.   Etc....
>
>Am I reading that correctly?   If so, -1.   That needs to be fixed.
>
>
>Dan
>
>
>
>On Thursday 24 May 2007 13:33, andreasmyth@apache.org wrote:
>  
>
>>Author: andreasmyth
>>Date: Thu May 24 10:33:11 2007
>>New Revision: 541364
>>
>>URL: http://svn.apache.org/viewvc?view=rev&rev=541364
>>Log:
>>[JIRA CXF-139] Client-side recovery.
>>Added client lifecycle management interface to be notified of client
>>creation so that recovery can take place before client sends any
>>requests. Fixed minor bug in handling TerminateSequence invocation.
>>
>>Added:
>>   
>>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLi
>>feCycleListener.java   (with props)
>>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLi
>>feCycleManager.java   (with props)
>>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Clie
>>ntLifeCycleManagerImpl.java   (with props)
>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMUtil
>>sTest.java   (with props)
>>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/r
>>m/ClientPersistenceTest.java   (with props) Removed:
>>   
>>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/r
>>m/PersistenceTest.java Modified:
>>   
>>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.j
>>ava
>>incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/bind
>>ing/soap/interceptor/SoapHeaderInterceptor.java
>>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Clie
>>ntImpl.java
>>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Serv
>>erLifeCycleManagerImpl.java
>>incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Abstra
>>ctEndpoint.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.
>>java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCont
>>extUtils.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndp
>>oint.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMana
>>ger.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutI
>>nterceptor.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtil
>>s.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Retran
>>smissionQueue.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servan
>>t.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/R
>>etransmissionQueueImpl.java
>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/Abstra
>>ctEndpointTest.java
>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyT
>>est.java
>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndp
>>ointTest.java
>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMMana
>>gerTest.java
>>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/r
>>m/oneway-client-crash.xml
>>
>>    
>>
>
>  
>


Re: svn commit: r541364 [1/2] - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/endpoint/ rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/core/src/main/resources/META-INF/c...

Posted by Andrea Smyth <an...@iona.com>.
Daniel Kulp wrote:

>Andrea,
>
>If I'm reading this code correctly.....
>
>The ClientLifeCycleManager holds on to all clients with strong 
>references.  The only way to release it is to call "destroy()" on the 
>client.   
>  
>
The ClientLifeCycleManager (implementation) holds on to all client 
lifecycle *listeners*, not the clients.
What exactly are you referring to?

Andrea.

>That isn't a "normal" thing to do in a JAX-WS use case.   Thus, for 
>normal JAX-WS development, all clients ever created are held onto 
>forever.   All the resources they contain are never released.   Etc....
>
>Am I reading that correctly?   If so, -1.   That needs to be fixed.
>
>
>Dan
>
>
>
>On Thursday 24 May 2007 13:33, andreasmyth@apache.org wrote:
>  
>
>>Author: andreasmyth
>>Date: Thu May 24 10:33:11 2007
>>New Revision: 541364
>>
>>URL: http://svn.apache.org/viewvc?view=rev&rev=541364
>>Log:
>>[JIRA CXF-139] Client-side recovery.
>>Added client lifecycle management interface to be notified of client
>>creation so that recovery can take place before client sends any
>>requests. Fixed minor bug in handling TerminateSequence invocation.
>>
>>Added:
>>   
>>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLi
>>feCycleListener.java   (with props)
>>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLi
>>feCycleManager.java   (with props)
>>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Clie
>>ntLifeCycleManagerImpl.java   (with props)
>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMUtil
>>sTest.java   (with props)
>>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/r
>>m/ClientPersistenceTest.java   (with props) Removed:
>>   
>>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/r
>>m/PersistenceTest.java Modified:
>>   
>>incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.j
>>ava
>>incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/bind
>>ing/soap/interceptor/SoapHeaderInterceptor.java
>>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Clie
>>ntImpl.java
>>incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/Serv
>>erLifeCycleManagerImpl.java
>>incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Abstra
>>ctEndpoint.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.
>>java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCont
>>extUtils.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndp
>>oint.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMana
>>ger.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutI
>>nterceptor.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtil
>>s.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Retran
>>smissionQueue.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servan
>>t.java
>>incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/R
>>etransmissionQueueImpl.java
>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/Abstra
>>ctEndpointTest.java
>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyT
>>est.java
>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndp
>>ointTest.java
>>incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMMana
>>gerTest.java
>>incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/r
>>m/oneway-client-crash.xml
>>
>>    
>>
>
>  
>