You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2008/09/03 20:41:24 UTC

svn commit: r691721 - in /cxf/branches/2.0.x-fixes: ./ api/src/main/java/org/apache/cxf/io/ rt/core/src/main/java/org/apache/cxf/interceptor/ rt/core/src/main/java/org/apache/cxf/io/ rt/transports/http/src/main/java/org/apache/cxf/transport/http/ rt/ws...

Author: dkulp
Date: Wed Sep  3 11:41:24 2008
New Revision: 691721

URL: http://svn.apache.org/viewvc?rev=691721&view=rev
Log:
Merged revisions 691357 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/branches/2.1.x-fixes

................
  r691357 | dkulp | 2008-09-02 15:58:45 -0400 (Tue, 02 Sep 2008) | 10 lines
  
  Merged revisions 691355 via svnmerge from 
  https://svn.apache.org/repos/asf/cxf/trunk
  
  ........
    r691355 | dkulp | 2008-09-02 15:56:42 -0400 (Tue, 02 Sep 2008) | 4 lines
    
    [CXF-1778] Fix memory leak with WS-Addressing turned on on the client, but server doesn't respond with addressing information. 
    Also reduce memory usage by clearing stuff in the messages sooner.
  ........
................

Modified:
    cxf/branches/2.0.x-fixes/   (props changed)
    cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java
    cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java
    cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java
    cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java
    cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
    cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java
    cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
    cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties
    cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java

Propchange: cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  3 11:41:24 2008
@@ -1,3 +1,3 @@
-/cxf/branches/2.1.x-fixes:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816,682902,682951,683089,683290,683318,684099,684790-684793,684842,684862,684895-684918,685205,685253,686237,686283,686299,686333-686364,686765,686827,687097,687464-687465,689109,689112,689122,691316
-/cxf/trunk:651669-686342,686344-686363,686764,686820,687096,687387,687463,688086,688102,688735,691271
+/cxf/branches/2.1.x-fixes:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816,682902,682951,683089,683290,683318,684099,684790-684793,684842,684862,684895-684918,685205,685253,686237,686283,686299,686333-686364,686765,686827,687097,687464-687465,689109,689112,689122,691316,691357
+/cxf/trunk:651669-686342,686344-686363,686764,686820,687096,687387,687463,688086,688102,688735,691271,691355
 /incubator/cxf/trunk:434594-651668

Propchange: cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Wed Sep  3 11:41:24 2008
@@ -1 +1 @@
-/cxf/branches/2.1.x-fixes:1-686313,686315-686332,686334-686346,686348-686828,687097,687464-687465,689109,689112,689122,691316
+/cxf/branches/2.1.x-fixes:1-686313,686315-686332,686334-686346,686348-686828,687097,687464-687465,689109,689112,689122,691316,691357

Modified: cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java?rev=691721&r1=691720&r2=691721&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java (original)
+++ cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java Wed Sep  3 11:41:24 2008
@@ -186,6 +186,10 @@
      * @throws IOException
      */
     public void resetOut(OutputStream out, boolean copyOldContent) throws IOException {
+        if (out == null) {
+            out = new ByteArrayOutputStream();
+        }
+
         if (currentStream instanceof CachedOutputStream) {
             CachedOutputStream ac = (CachedOutputStream) currentStream;
             InputStream in = ac.getInputStream();
@@ -210,6 +214,7 @@
                 if (copyOldContent) {
                     IOUtils.copyAndCloseInput(fin, out);
                 }
+                streamList.remove(currentStream);
                 tempFile.delete();
                 tempFile = null;
                 inmem = true;

Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java?rev=691721&r1=691720&r2=691721&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java (original)
+++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java Wed Sep  3 11:41:24 2008
@@ -74,16 +74,18 @@
             // Write the output while caching it for the log message
             final CacheAndWriteOutputStream newOut = new CacheAndWriteOutputStream(os);
             message.setContent(OutputStream.class, newOut);
-            newOut.registerCallback(new LoggingCallback(message));
+            newOut.registerCallback(new LoggingCallback(message, os));
         }
     }
 
     class LoggingCallback implements CachedOutputStreamCallback {
         
         private final Message message;
+        private final OutputStream origStream;
         
-        public LoggingCallback(final Message msg) {
+        public LoggingCallback(final Message msg, final OutputStream os) {
             this.message = msg;
+            this.origStream = os;
         }
 
         public void onFlush(CachedOutputStream cos) {  
@@ -128,6 +130,15 @@
             } else if (LOG.isLoggable(Level.INFO)) {
                 LOG.info(buffer.toString());
             }
+            try {
+                //empty out the cache
+                cos.lockOutputStream();
+                cos.resetOut(null, false);
+            } catch (Exception ex) {
+                //ignore
+            }
+            message.setContent(OutputStream.class, 
+                               origStream);
         }
     } 
 }

Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java?rev=691721&r1=691720&r2=691721&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java (original)
+++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java Wed Sep  3 11:41:24 2008
@@ -137,6 +137,7 @@
                     xtw.writeEndDocument();
                     xtw.close();
                 }
+                message.removeContent(XMLStreamWriter.class);
             } catch (XMLStreamException e) {
                 throw new Fault(new org.apache.cxf.common.i18n.Message("STAX_WRITE_EXC", BUNDLE), e);
             }

Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java?rev=691721&r1=691720&r2=691721&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java (original)
+++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java Wed Sep  3 11:41:24 2008
@@ -47,6 +47,10 @@
         flowThroughStream.close();
     }
     
+    public OutputStream getFlowThroughStream() {
+        return flowThroughStream;
+    }
+    
     
     @Override
     protected void onWrite() throws IOException {

Modified: cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=691721&r1=691720&r2=691721&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Wed Sep  3 11:41:24 2008
@@ -1827,7 +1827,7 @@
             try {
                 handleResponse();
             } finally {
-                if (cachingForRetransmission) {
+                if (cachingForRetransmission && cachedStream != null) {
                     cachedStream.close();
                 }
             }
@@ -1937,6 +1937,14 @@
                     connection.getInputStream().close();
                     return;
                 }
+            } else {
+                //not going to be resending or anything, clear out the stuff in the out message
+                //to free memory
+                outMessage.removeContent(OutputStream.class);
+                if (cachingForRetransmission) {
+                    cachedStream.close();
+                }
+                cachedStream = null;
             }
             
             Message inMessage = new MessageImpl();
@@ -1989,6 +1997,7 @@
             }
             inMessage.setContent(InputStream.class, in);
             
+            
             incomingObserver.onMessage(inMessage);
         }
     }

Modified: cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java?rev=691721&r1=691720&r2=691721&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java (original)
+++ cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java Wed Sep  3 11:41:24 2008
@@ -60,6 +60,7 @@
  * Properties for outgoing messages.
  */
 public class MAPAggregator extends AbstractPhaseInterceptor<Message> {
+    public static final String USING_ADDRESSING = MAPAggregator.class.getName() + ".usingAddressing";
 
     private static final Logger LOG = 
         LogUtils.getL7dLogger(MAPAggregator.class);
@@ -74,11 +75,6 @@
     protected final Map<String, String> messageIDs = 
         new ConcurrentHashMap<String, String>();
     
-    /**
-     * Whether the endpoint supports WS-Addressing.
-     */
-
-    private final Map<Endpoint, Boolean> usingAddressing = new ConcurrentHashMap<Endpoint, Boolean>();
     private boolean usingAddressingAdvisory = true;
 
     private boolean allowDuplicates = true;
@@ -183,7 +179,7 @@
         boolean ret = false;
         Endpoint endpoint = message.getExchange().get(Endpoint.class);
         if (null != endpoint) {
-            Boolean b = usingAddressing.get(endpoint);
+            Boolean b = (Boolean)endpoint.get(USING_ADDRESSING);
             if (null == b) {
                 EndpointInfo endpointInfo = endpoint.getEndpointInfo();
                 List<ExtensibilityElement> endpointExts = endpointInfo != null ? endpointInfo
@@ -197,7 +193,7 @@
                 ret = hasUsingAddressing(endpointExts) || hasUsingAddressing(bindingExts)
                              || hasUsingAddressing(serviceExts);
                 b = ret ? Boolean.TRUE : Boolean.FALSE;
-                usingAddressing.put(endpoint, b);
+                endpoint.put(USING_ADDRESSING, b);
             } else {
                 ret = b.booleanValue();
             }

Modified: cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java?rev=691721&r1=691720&r2=691721&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java (original)
+++ cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java Wed Sep  3 11:41:24 2008
@@ -19,13 +19,12 @@
 
 package org.apache.cxf.ws.addressing.soap;
 
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -80,8 +79,8 @@
      * REVISIT: map usage that the *same* interceptor instance 
      * is used in all chains.
      */
-    protected final Map<String, Exchange> uncorrelatedExchanges =
-        Collections.synchronizedMap(new HashMap<String, Exchange>());
+    protected final Map<String, Exchange> uncorrelatedExchanges 
+        = new ConcurrentHashMap<String, Exchange>();
 
     private VersionTransformer transformer;
     private HeaderFactory headerFactory;
@@ -734,7 +733,18 @@
                 LOG.log(Level.WARNING, "CORRELATION_FAILURE_MSG");
                 message.getInterceptorChain().abort();
             }
+        } else if (maps == null && isRequestor(message)) {
+            Message m = message.getExchange().getOutMessage();
+            maps = ContextUtils.retrieveMAPs(m, false, true, false);
+            if (maps != null) {
+                Exchange ex = uncorrelatedExchanges.get(maps.getMessageID().getValue());
+                if (ex == message.getExchange()) {
+                    uncorrelatedExchanges.remove(maps.getMessageID().getValue());
+                    LOG.log(Level.WARNING, "RESPONSE_NOT_USING_WSADDRESSING");
+                }
+            }
         }
+        
     }
 
     /**

Modified: cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties?rev=691721&r1=691720&r2=691721&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties (original)
+++ cxf/branches/2.0.x-fixes/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties Wed Sep  3 11:41:24 2008
@@ -23,3 +23,4 @@
 UNSUPPORTED_VERSION_MSG = Unsupported WS-Addressing version {0}
 IGNORE_NON_ELEMENT_REF_PARAM_MSG = Ignoring reference parameter {0} because it is not a JAXBElement
 CORRELATION_FAILURE_MSG = Failed to correlate message, aborting dispatch.
+RESPONSE_NOT_USING_WSADDRESSING = Response message does not contain WS-Addressing properties.  Not correlating response.

Modified: cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java?rev=691721&r1=691720&r2=691721&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java (original)
+++ cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java Wed Sep  3 11:41:24 2008
@@ -32,12 +32,14 @@
 import org.apache.cxf.service.Service;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.MAPAggregator;
 
 public class WrappedEndpoint implements Endpoint {
 
     private Endpoint wrappedEndpoint;
     private EndpointInfo endpointInfo;
     private Service service;
+    private Boolean usingAddressing;
     
     WrappedEndpoint(Endpoint wrapped, EndpointInfo info, Service s) {
         wrappedEndpoint = wrapped;
@@ -118,6 +120,9 @@
     }
 
     public Object get(Object key) {
+        if (MAPAggregator.USING_ADDRESSING == key) {
+            return usingAddressing;
+        }
         return wrappedEndpoint.get(key);
     }
 
@@ -130,6 +135,10 @@
     }
 
     public Object put(String key, Object value) {
+        if (MAPAggregator.USING_ADDRESSING == key) {
+            usingAddressing = (Boolean)value;
+            return null;
+        }
         return wrappedEndpoint.put(key, value);
     }