You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2008/09/02 21:56:43 UTC
svn commit: r691355 - in /cxf/trunk: api/src/main/java/org/apache/cxf/io/
rt/core/src/main/java/org/apache/cxf/interceptor/
rt/core/src/main/java/org/apache/cxf/io/
rt/transports/http/src/main/java/org/apache/cxf/transport/http/
rt/ws/addr/src/main/jav...
Author: dkulp
Date: Tue Sep 2 12:56:42 2008
New Revision: 691355
URL: http://svn.apache.org/viewvc?rev=691355&view=rev
Log:
[CXF-1778] Fix memory leak with WS-Addressing turned on on the client, but server doesn't respond with addressing information.
Also reduce memory usage by clearing stuff in the messages sooner.
Modified:
cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java
cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java
cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java
cxf/trunk/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java
cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java
Modified: cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java?rev=691355&r1=691354&r2=691355&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/io/CachedOutputStream.java Tue Sep 2 12:56:42 2008
@@ -186,6 +186,10 @@
* @throws IOException
*/
public void resetOut(OutputStream out, boolean copyOldContent) throws IOException {
+ if (out == null) {
+ out = new ByteArrayOutputStream();
+ }
+
if (currentStream instanceof CachedOutputStream) {
CachedOutputStream ac = (CachedOutputStream) currentStream;
InputStream in = ac.getInputStream();
@@ -210,6 +214,7 @@
if (copyOldContent) {
IOUtils.copyAndCloseInput(fin, out);
}
+ streamList.remove(currentStream);
tempFile.delete();
tempFile = null;
inmem = true;
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java?rev=691355&r1=691354&r2=691355&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/LoggingOutInterceptor.java Tue Sep 2 12:56:42 2008
@@ -74,16 +74,18 @@
// Write the output while caching it for the log message
final CacheAndWriteOutputStream newOut = new CacheAndWriteOutputStream(os);
message.setContent(OutputStream.class, newOut);
- newOut.registerCallback(new LoggingCallback(message));
+ newOut.registerCallback(new LoggingCallback(message, os));
}
}
class LoggingCallback implements CachedOutputStreamCallback {
private final Message message;
+ private final OutputStream origStream;
- public LoggingCallback(final Message msg) {
+ public LoggingCallback(final Message msg, final OutputStream os) {
this.message = msg;
+ this.origStream = os;
}
public void onFlush(CachedOutputStream cos) {
@@ -128,6 +130,15 @@
} else if (LOG.isLoggable(Level.INFO)) {
LOG.info(buffer.toString());
}
+ try {
+ //empty out the cache
+ cos.lockOutputStream();
+ cos.resetOut(null, false);
+ } catch (Exception ex) {
+ //ignore
+ }
+ message.setContent(OutputStream.class,
+ origStream);
}
}
}
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java?rev=691355&r1=691354&r2=691355&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/interceptor/StaxOutInterceptor.java Tue Sep 2 12:56:42 2008
@@ -157,10 +157,12 @@
xtw.writeEndDocument();
xtw.close();
}
+
OutputStream os = (OutputStream)message.get(OUTPUT_STREAM_HOLDER);
if (os != null) {
message.setContent(OutputStream.class, os);
}
+ message.removeContent(XMLStreamWriter.class);
} catch (XMLStreamException e) {
throw new Fault(new org.apache.cxf.common.i18n.Message("STAX_WRITE_EXC", BUNDLE), e);
}
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java?rev=691355&r1=691354&r2=691355&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/io/CacheAndWriteOutputStream.java Tue Sep 2 12:56:42 2008
@@ -47,6 +47,10 @@
flowThroughStream.close();
}
+ public OutputStream getFlowThroughStream() {
+ return flowThroughStream;
+ }
+
@Override
protected void onWrite() throws IOException {
Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=691355&r1=691354&r2=691355&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Tue Sep 2 12:56:42 2008
@@ -1864,7 +1864,7 @@
try {
handleResponse();
} finally {
- if (cachingForRetransmission) {
+ if (cachingForRetransmission && cachedStream != null) {
cachedStream.close();
}
}
@@ -1974,6 +1974,14 @@
connection.getInputStream().close();
return;
}
+ } else {
+ //not going to be resending or anything, clear out the stuff in the out message
+ //to free memory
+ outMessage.removeContent(OutputStream.class);
+ if (cachingForRetransmission) {
+ cachedStream.close();
+ }
+ cachedStream = null;
}
Message inMessage = new MessageImpl();
@@ -2026,6 +2034,7 @@
}
inMessage.setContent(InputStream.class, in);
+
incomingObserver.onMessage(inMessage);
}
Modified: cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java?rev=691355&r1=691354&r2=691355&view=diff
==============================================================================
--- cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java (original)
+++ cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java Tue Sep 2 12:56:42 2008
@@ -63,6 +63,7 @@
* Properties for outgoing messages.
*/
public class MAPAggregator extends AbstractPhaseInterceptor<Message> {
+ public static final String USING_ADDRESSING = MAPAggregator.class.getName() + ".usingAddressing";
private static final Logger LOG =
LogUtils.getL7dLogger(MAPAggregator.class);
@@ -77,11 +78,6 @@
protected final Map<String, String> messageIDs =
new ConcurrentHashMap<String, String>();
- /**
- * Whether the endpoint supports WS-Addressing.
- */
-
- private final Map<Endpoint, Boolean> usingAddressing = new ConcurrentHashMap<Endpoint, Boolean>();
private boolean usingAddressingAdvisory = true;
private boolean addressingRequired;
@@ -203,7 +199,7 @@
boolean ret = false;
Endpoint endpoint = message.getExchange().get(Endpoint.class);
if (null != endpoint) {
- Boolean b = usingAddressing.get(endpoint);
+ Boolean b = (Boolean)endpoint.get(USING_ADDRESSING);
if (null == b) {
EndpointInfo endpointInfo = endpoint.getEndpointInfo();
List<ExtensibilityElement> endpointExts = endpointInfo != null ? endpointInfo
@@ -217,7 +213,7 @@
ret = hasUsingAddressing(endpointExts) || hasUsingAddressing(bindingExts)
|| hasUsingAddressing(serviceExts);
b = ret ? Boolean.TRUE : Boolean.FALSE;
- usingAddressing.put(endpoint, b);
+ endpoint.put(USING_ADDRESSING, b);
} else {
ret = b.booleanValue();
}
Modified: cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java?rev=691355&r1=691354&r2=691355&view=diff
==============================================================================
--- cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java (original)
+++ cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java Tue Sep 2 12:56:42 2008
@@ -19,13 +19,12 @@
package org.apache.cxf.ws.addressing.soap;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -80,8 +79,8 @@
* REVISIT: map usage that the *same* interceptor instance
* is used in all chains.
*/
- protected final Map<String, Exchange> uncorrelatedExchanges =
- Collections.synchronizedMap(new HashMap<String, Exchange>());
+ protected final Map<String, Exchange> uncorrelatedExchanges
+ = new ConcurrentHashMap<String, Exchange>();
private VersionTransformer transformer;
private HeaderFactory headerFactory;
@@ -734,7 +733,18 @@
LOG.log(Level.WARNING, "CORRELATION_FAILURE_MSG");
message.getInterceptorChain().abort();
}
+ } else if (maps == null && isRequestor(message)) {
+ Message m = message.getExchange().getOutMessage();
+ maps = ContextUtils.retrieveMAPs(m, false, true, false);
+ if (maps != null) {
+ Exchange ex = uncorrelatedExchanges.get(maps.getMessageID().getValue());
+ if (ex == message.getExchange()) {
+ uncorrelatedExchanges.remove(maps.getMessageID().getValue());
+ LOG.log(Level.WARNING, "RESPONSE_NOT_USING_WSADDRESSING");
+ }
+ }
}
+
}
/**
Modified: cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties?rev=691355&r1=691354&r2=691355&view=diff
==============================================================================
--- cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties (original)
+++ cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/Messages.properties Tue Sep 2 12:56:42 2008
@@ -23,3 +23,4 @@
UNSUPPORTED_VERSION_MSG = Unsupported WS-Addressing version {0}
IGNORE_NON_ELEMENT_REF_PARAM_MSG = Ignoring reference parameter {0} because it is not a JAXBElement
CORRELATION_FAILURE_MSG = Failed to correlate message, aborting dispatch.
+RESPONSE_NOT_USING_WSADDRESSING = Response message does not contain WS-Addressing properties. Not correlating response.
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java?rev=691355&r1=691354&r2=691355&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/WrappedEndpoint.java Tue Sep 2 12:56:42 2008
@@ -32,12 +32,14 @@
import org.apache.cxf.service.Service;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.MAPAggregator;
public class WrappedEndpoint implements Endpoint {
private Endpoint wrappedEndpoint;
private EndpointInfo endpointInfo;
private Service service;
+ private Boolean usingAddressing;
WrappedEndpoint(Endpoint wrapped, EndpointInfo info, Service s) {
wrappedEndpoint = wrapped;
@@ -118,6 +120,9 @@
}
public Object get(Object key) {
+ if (MAPAggregator.USING_ADDRESSING == key) {
+ return usingAddressing;
+ }
return wrappedEndpoint.get(key);
}
@@ -130,6 +135,10 @@
}
public Object put(String key, Object value) {
+ if (MAPAggregator.USING_ADDRESSING == key) {
+ usingAddressing = (Boolean)value;
+ return null;
+ }
return wrappedEndpoint.put(key, value);
}