You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2014/01/20 13:18:00 UTC
svn commit: r1559673 - in /cxf/trunk:
core/src/main/java/org/apache/cxf/continuations/
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/
rt/transports/http/src/main/java/org/apa...
Author: sergeyb
Date: Mon Jan 20 12:17:59 2014
New Revision: 1559673
URL: http://svn.apache.org/r1559673
Log:
[CXF-5417] Getting Servlet3Continuation callback on registered listener if client has disconnected, many thanks to Andriy Redko
Modified:
cxf/trunk/core/src/main/java/org/apache/cxf/continuations/ContinuationCallback.java
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java
Modified: cxf/trunk/core/src/main/java/org/apache/cxf/continuations/ContinuationCallback.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/continuations/ContinuationCallback.java?rev=1559673&r1=1559672&r2=1559673&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/continuations/ContinuationCallback.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/continuations/ContinuationCallback.java Mon Jan 20 12:17:59 2014
@@ -21,12 +21,22 @@ package org.apache.cxf.continuations;
/**
* Callback that receives continuation status updates.
- *
- * Note: this interface is a work in progress
*/
public interface ContinuationCallback {
+ /**
+ * This method is called when the container completes writing the response to the client
+ */
void onComplete();
+ /**
+ * This method is called when the exception gets propagated to the container
+ * @param t the propagated exception instance
+ */
void onError(Throwable error);
+
+ /**
+ * This method may be called if the container detects that the client has disconnected
+ */
+ void onDisconnect();
}
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java?rev=1559673&r1=1559672&r2=1559673&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java Mon Jan 20 12:17:59 2014
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
+import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.TimeoutHandler;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
@@ -53,7 +55,8 @@ public class AsyncResponseImpl implement
private boolean resumedByApplication;
private TimeoutHandler timeoutHandler;
- private List<CompletionCallback> completionCallbacks;
+ private List<CompletionCallback> completionCallbacks = new LinkedList<CompletionCallback>();
+ private List<ConnectionCallback> connectionCallbacks = new LinkedList<ConnectionCallback>();
private Throwable unmappedThrowable;
public AsyncResponseImpl(Message inMessage) {
@@ -195,16 +198,16 @@ public class AsyncResponseImpl implement
Class<?> callbackCls = allCallbacks[i].getClass();
Collection<Class<?>> knownCallbacks = map.get(callbackCls);
if (knownCallbacks == null) {
- knownCallbacks = new LinkedList<Class<?>>();
+ knownCallbacks = new HashSet<Class<?>>();
map.put(callbackCls, knownCallbacks);
}
if (allCallbacks[i] instanceof CompletionCallback) {
knownCallbacks.add(CompletionCallback.class);
- if (completionCallbacks == null) {
- completionCallbacks = new LinkedList<CompletionCallback>();
- completionCallbacks.add((CompletionCallback)allCallbacks[i]);
- }
+ completionCallbacks.add((CompletionCallback)allCallbacks[i]);
+ } else if (allCallbacks[i] instanceof ConnectionCallback) {
+ knownCallbacks.add(ConnectionCallback.class);
+ connectionCallbacks.add((ConnectionCallback)allCallbacks[i]);
}
}
return map;
@@ -222,11 +225,16 @@ public class AsyncResponseImpl implement
}
private void updateCompletionCallbacks(Throwable error) {
- if (completionCallbacks != null) {
- Throwable actualError = error instanceof Fault ? ((Fault)error).getCause() : error;
- for (CompletionCallback completionCallback : completionCallbacks) {
- completionCallback.onComplete(actualError);
- }
+ Throwable actualError = error instanceof Fault ? ((Fault)error).getCause() : error;
+ for (CompletionCallback completionCallback : completionCallbacks) {
+ completionCallback.onComplete(actualError);
+ }
+ }
+
+ @Override
+ public void onDisconnect() {
+ for (ConnectionCallback connectionCallback : connectionCallbacks) {
+ connectionCallback.onDisconnect(this);
}
}
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java?rev=1559673&r1=1559672&r2=1559673&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java Mon Jan 20 12:17:59 2014
@@ -34,7 +34,6 @@ import java.util.logging.Logger;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.HttpMethod;
-import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.Produces;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
@@ -48,6 +47,7 @@ import javax.xml.stream.events.XMLEvent;
import org.apache.cxf.common.i18n.BundleUtils;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor;
+import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.jaxrs.impl.ResponseImpl;
import org.apache.cxf.jaxrs.impl.WriterInterceptorMBW;
@@ -184,9 +184,6 @@ public class JAXRSOutInterceptor extends
// Run the filters
try {
JAXRSUtils.runContainerResponseFilters(providerFactory, response, message, ori, invoked);
- } catch (IOException ex) {
- handleWriteException(providerFactory, message, ex, firstTry);
- return;
} catch (Throwable ex) {
handleWriteException(providerFactory, message, ex, firstTry);
return;
@@ -264,9 +261,6 @@ public class JAXRSOutInterceptor extends
}
}
- } catch (IOException ex) {
- logWriteError(firstTry, targetType, responseMediaType);
- handleWriteException(providerFactory, message, ex, firstTry);
} catch (Throwable ex) {
logWriteError(firstTry, targetType, responseMediaType);
handleWriteException(providerFactory, message, ex, firstTry);
@@ -366,7 +360,7 @@ public class JAXRSOutInterceptor extends
}
if (excResponse == null) {
setResponseStatus(message, 500);
- throw new InternalServerErrorException(ex);
+ throw new Fault(ex);
} else {
serializeMessage(pf, message, excResponse, null, false);
}
Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java?rev=1559673&r1=1559672&r2=1559673&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java Mon Jan 20 12:17:59 2014
@@ -123,6 +123,12 @@ public class Servlet3ContinuationProvide
public void reset() {
context.complete();
obj = null;
+ if (callback != null) {
+ final Exception ex = inMessage.getExchange().get(Exception.class);
+ if (ex != null && isClientDisconnected(ex)) {
+ callback.onDisconnect();
+ }
+ }
}
public boolean isNew() {
@@ -166,6 +172,10 @@ public class Servlet3ContinuationProvide
//REVISIT: isResumed = true;
redispatch();
}
+ private boolean isClientDisconnected(Exception ex) {
+ String exName = (String)inMessage.getContextualProperty("disconnected.client.exception.class");
+ return ex != null && exName.equals(ex.getClass().getName());
+ }
}
}