You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2010/01/19 19:14:17 UTC

svn commit: r900876 - in /cxf/trunk: api/src/main/java/org/apache/cxf/io/DelegatingInputStream.java rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java

Author: dkulp
Date: Tue Jan 19 18:14:17 2010
New Revision: 900876

URL: http://svn.apache.org/viewvc?rev=900876&view=rev
Log:
[CXF-2619] Make sure we cache incoming content on first write or we
could hit a deadlock

Modified:
    cxf/trunk/api/src/main/java/org/apache/cxf/io/DelegatingInputStream.java
    cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/io/DelegatingInputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/io/DelegatingInputStream.java?rev=900876&r1=900875&r2=900876&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/io/DelegatingInputStream.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/io/DelegatingInputStream.java Tue Jan 19 18:14:17 2010
@@ -55,12 +55,15 @@
      * stream may not be valid by the time the next read() occurs
      */
     public void cacheInput() {
-        CachedOutputStream cache = new CachedOutputStream();
-        try {
-            IOUtils.copy(in, cache);
-            in = cache.getInputStream();
-        } catch (IOException e) {
-            //ignore
+        if (in != origIn) {
+            CachedOutputStream cache = new CachedOutputStream();
+            try {
+                IOUtils.copy(in, cache); 
+                in = cache.getInputStream();
+                cache.close();
+            } catch (IOException e) {
+                //ignore
+            }
         }
     }
     

Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java?rev=900876&r1=900875&r2=900876&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java Tue Jan 19 18:14:17 2010
@@ -28,6 +28,7 @@
 import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -44,6 +45,7 @@
 import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.attachment.AttachmentDataSource;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.Base64Exception;
 import org.apache.cxf.common.util.Base64Utility;
@@ -52,9 +54,11 @@
 import org.apache.cxf.configuration.security.AuthorizationPolicy;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.io.AbstractWrappedOutputStream;
 import org.apache.cxf.io.DelegatingInputStream;
+import org.apache.cxf.message.Attachment;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.security.SecurityContext;
@@ -450,10 +454,43 @@
     }
   
    
+    /**
+     * On first write, we need to make sure any attachments and such that are still on the incoming stream 
+     * are read in.  Otherwise we can get into a deadlock where the client is still trying to send the 
+     * request, but the server is trying to send the response.   Neither side is reading and both blocked 
+     * on full buffers.  Not a good situation.    
+     * @param outMessage
+     */
+    private void cacheInput(Message outMessage) {
+        Message inMessage = outMessage.getExchange().getInMessage();
+        if (inMessage == null) {
+            return;
+        }
+        Collection<Attachment> atts = inMessage.getAttachments();
+        if (atts != null) {
+            for (Attachment a : atts) {
+                if (a.getDataHandler().getDataSource() instanceof AttachmentDataSource) {
+                    try {
+                        ((AttachmentDataSource)a.getDataHandler().getDataSource()).cache();
+                    } catch (IOException e) {
+                        throw new Fault(e);
+                    }
+                }
+            }
+        }
+        DelegatingInputStream in = inMessage.getContent(DelegatingInputStream.class);
+        if (in != null) {
+            in.cacheInput();
+        }
+    }
+    
     protected OutputStream flushHeaders(Message outMessage) throws IOException {
         if (isResponseRedirected(outMessage)) {
             return null;
         }
+        
+        cacheInput(outMessage);
+        
         updateResponseHeaders(outMessage);
         Object responseObj = outMessage.get(HTTP_RESPONSE);
         OutputStream responseStream = null;