You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2014/01/20 13:18:00 UTC

svn commit: r1559673 - in /cxf/trunk: core/src/main/java/org/apache/cxf/continuations/ rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/ rt/transports/http/src/main/java/org/apa...

Author: sergeyb
Date: Mon Jan 20 12:17:59 2014
New Revision: 1559673

URL: http://svn.apache.org/r1559673
Log:
[CXF-5417] Getting Servlet3Continuation callback on registered listener if client has disconnected, many thanks to Andriy Redko

Modified:
    cxf/trunk/core/src/main/java/org/apache/cxf/continuations/ContinuationCallback.java
    cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
    cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java
    cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java

Modified: cxf/trunk/core/src/main/java/org/apache/cxf/continuations/ContinuationCallback.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/continuations/ContinuationCallback.java?rev=1559673&r1=1559672&r2=1559673&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/continuations/ContinuationCallback.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/continuations/ContinuationCallback.java Mon Jan 20 12:17:59 2014
@@ -21,12 +21,22 @@ package org.apache.cxf.continuations;
 
 /**
  * Callback that receives continuation status updates.
- * 
- * Note: this interface is a work in progress
  */
 public interface ContinuationCallback {
     
+    /**
+     * This method is called when the container completes writing the response to the client  
+     */
     void onComplete();
     
+    /**
+     * This method is called when the exception gets propagated to the container  
+     * @param t the propagated exception instance
+     */
     void onError(Throwable error);
+    
+    /**
+     * This method may be called if the container detects that the client has disconnected
+     */
+    void onDisconnect();
 }

Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java?rev=1559673&r1=1559672&r2=1559673&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java Mon Jan 20 12:17:59 2014
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import javax.ws.rs.ServiceUnavailableException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.CompletionCallback;
+import javax.ws.rs.container.ConnectionCallback;
 import javax.ws.rs.container.TimeoutHandler;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
@@ -53,7 +55,8 @@ public class AsyncResponseImpl implement
     private boolean resumedByApplication;
     private TimeoutHandler timeoutHandler;
     
-    private List<CompletionCallback> completionCallbacks;
+    private List<CompletionCallback> completionCallbacks = new LinkedList<CompletionCallback>();
+    private List<ConnectionCallback> connectionCallbacks = new LinkedList<ConnectionCallback>();
     private Throwable unmappedThrowable;
     
     public AsyncResponseImpl(Message inMessage) {
@@ -195,16 +198,16 @@ public class AsyncResponseImpl implement
             Class<?> callbackCls = allCallbacks[i].getClass();
             Collection<Class<?>> knownCallbacks = map.get(callbackCls);
             if (knownCallbacks == null) {
-                knownCallbacks = new LinkedList<Class<?>>();
+                knownCallbacks = new HashSet<Class<?>>();
                 map.put(callbackCls, knownCallbacks);
             }
             
             if (allCallbacks[i] instanceof CompletionCallback) {
                 knownCallbacks.add(CompletionCallback.class);
-                if (completionCallbacks == null) {
-                    completionCallbacks = new LinkedList<CompletionCallback>();
-                    completionCallbacks.add((CompletionCallback)allCallbacks[i]);        
-                }
+                completionCallbacks.add((CompletionCallback)allCallbacks[i]);        
+            } else if (allCallbacks[i] instanceof ConnectionCallback) {
+                knownCallbacks.add(ConnectionCallback.class);
+                connectionCallbacks.add((ConnectionCallback)allCallbacks[i]);        
             }
         }
         return map;
@@ -222,11 +225,16 @@ public class AsyncResponseImpl implement
     }
     
     private void updateCompletionCallbacks(Throwable error) {
-        if (completionCallbacks != null) {
-            Throwable actualError = error instanceof Fault ? ((Fault)error).getCause() : error;
-            for (CompletionCallback completionCallback : completionCallbacks) {
-                completionCallback.onComplete(actualError);
-            }
+        Throwable actualError = error instanceof Fault ? ((Fault)error).getCause() : error;
+        for (CompletionCallback completionCallback : completionCallbacks) {
+            completionCallback.onComplete(actualError);
+        }
+    }
+    
+    @Override
+    public void onDisconnect() {
+        for (ConnectionCallback connectionCallback : connectionCallbacks) {
+            connectionCallback.onDisconnect(this);
         }
     }
     

Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java?rev=1559673&r1=1559672&r2=1559673&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java Mon Jan 20 12:17:59 2014
@@ -34,7 +34,6 @@ import java.util.logging.Logger;
 
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.HttpMethod;
-import javax.ws.rs.InternalServerErrorException;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
@@ -48,6 +47,7 @@ import javax.xml.stream.events.XMLEvent;
 import org.apache.cxf.common.i18n.BundleUtils;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor;
+import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.jaxrs.impl.ResponseImpl;
 import org.apache.cxf.jaxrs.impl.WriterInterceptorMBW;
@@ -184,9 +184,6 @@ public class JAXRSOutInterceptor extends
         // Run the filters
         try {
             JAXRSUtils.runContainerResponseFilters(providerFactory, response, message, ori, invoked);
-        } catch (IOException ex) {
-            handleWriteException(providerFactory, message, ex, firstTry);
-            return;
         } catch (Throwable ex) {
             handleWriteException(providerFactory, message, ex, firstTry);
             return;
@@ -264,9 +261,6 @@ public class JAXRSOutInterceptor extends
                 }
             }
             
-        } catch (IOException ex) {
-            logWriteError(firstTry, targetType, responseMediaType);
-            handleWriteException(providerFactory, message, ex, firstTry);
         } catch (Throwable ex) {
             logWriteError(firstTry, targetType, responseMediaType);
             handleWriteException(providerFactory, message, ex, firstTry);
@@ -366,7 +360,7 @@ public class JAXRSOutInterceptor extends
         }
         if (excResponse == null) {
             setResponseStatus(message, 500);
-            throw new InternalServerErrorException(ex);
+            throw new Fault(ex);
         } else {
             serializeMessage(pf, message, excResponse, null, false);
         } 

Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java?rev=1559673&r1=1559672&r2=1559673&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java Mon Jan 20 12:17:59 2014
@@ -123,6 +123,12 @@ public class Servlet3ContinuationProvide
         public void reset() {
             context.complete();
             obj = null;
+            if (callback != null) {
+                final Exception ex = inMessage.getExchange().get(Exception.class);
+                if (ex != null && isClientDisconnected(ex)) {
+                    callback.onDisconnect();    
+                }
+            }
         }
 
         public boolean isNew() {
@@ -166,6 +172,10 @@ public class Servlet3ContinuationProvide
             //REVISIT: isResumed = true;
             redispatch();
         }
+        private boolean isClientDisconnected(Exception ex) {
+            String exName = (String)inMessage.getContextualProperty("disconnected.client.exception.class");
+            return ex != null && exName.equals(ex.getClass().getName());
+        }
         
     }
 }