You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2012/09/28 00:08:39 UTC
svn commit: r1391236 - in /cxf/trunk:
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/
rt/frontend/jaxrs/src/main/java/org...
Author: sergeyb
Date: Thu Sep 27 22:08:38 2012
New Revision: 1391236
URL: http://svn.apache.org/viewvc?rev=1391236&view=rev
Log:
[CXF-4455] Initial support for AsyncResponse
Added:
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java (with props)
Modified:
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java
cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java?rev=1391236&r1=1391235&r2=1391236&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java Thu Sep 27 22:08:38 2012
@@ -29,6 +29,7 @@ import java.util.ResourceBundle;
import java.util.logging.Logger;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
@@ -42,6 +43,7 @@ import org.apache.cxf.common.util.ClassH
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.InterceptorChain.State;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
import org.apache.cxf.jaxrs.impl.MetadataMap;
import org.apache.cxf.jaxrs.lifecycle.ResourceProvider;
import org.apache.cxf.jaxrs.model.ClassResourceInfo;
@@ -76,16 +78,28 @@ public class JAXRSInvoker extends Abstra
public Object invoke(Exchange exchange, Object request) {
Response response = exchange.get(Response.class);
+ if (response == null) {
+ AsyncResponse asyncResp = exchange.get(AsyncResponse.class);
+ if (asyncResp != null) {
+ AsyncResponseImpl asyncImpl = (AsyncResponseImpl)asyncResp;
+ Object asyncObj = asyncImpl.getResponseObject();
+ if (!(asyncObj instanceof Response)) {
+ if (asyncObj instanceof Throwable) {
+ return handleFault(new Fault((Throwable)asyncObj), exchange.getInMessage(), null, null);
+ } else {
+ response = Response.ok().entity(asyncObj).build();
+ }
+ } else {
+ response = (Response)asyncObj;
+ }
+ }
+ }
if (response != null) {
- // this means a blocking request filter provided a Response
- // or earlier exception has been converted to Response
-
- //TODO: should we remove response from exchange ?
- // or should we rather ignore content list and have
- // Response set here for all cases and extract it
- // in the out interceptor instead of dealing with the contents list ?
return new MessageContentsList(response);
}
+
+
+
ResourceProvider provider = getResourceProvider(exchange);
Object rootInstance = getServiceObject(exchange);
Object serviceObject = getActualServiceObject(exchange, rootInstance);
@@ -174,32 +188,15 @@ public class JAXRSInvoker extends Abstra
contextLoader = ClassLoaderUtils
.setThreadContextClassloader(resourceObject.getClass().getClassLoader());
}
+ AsyncResponse asyncResponse = inMessage.get(AsyncResponse.class);
+ if (asyncResponse != null) {
+ inMessage.put(AsyncResponse.class, null);
+ AsyncResponseImpl asyncImpl = (AsyncResponseImpl)asyncResponse;
+ asyncImpl.suspend();
+ }
result = invoke(exchange, resourceObject, methodToInvoke, params);
} catch (Fault ex) {
- String errorMessage = ex.getCause().getMessage();
- if (errorMessage != null
- && errorMessage.contains(PROXY_INVOCATION_ERROR_FRAGMENT)) {
- org.apache.cxf.common.i18n.Message errorM =
- new org.apache.cxf.common.i18n.Message("PROXY_INVOCATION_FAILURE",
- BUNDLE,
- methodToInvoke,
- cri.getServiceClass().getName());
- LOG.severe(errorM.toString());
- }
- Response excResponse = JAXRSUtils.convertFaultToResponse(ex.getCause(),
- exchange.getInMessage());
- if (excResponse == null) {
- providerFactory.clearThreadLocalProxies();
- ClassResourceInfo criRoot =
- (ClassResourceInfo)exchange.get(JAXRSUtils.ROOT_RESOURCE_CLASS);
- if (criRoot != null) {
- criRoot.clearThreadLocalProxies();
- }
- exchange.put(Message.PROPOGATE_EXCEPTION,
- JAXRSUtils.propogateException(exchange.getInMessage()));
- throw ex;
- }
- return new MessageContentsList(excResponse);
+ return handleFault(ex, inMessage, cri, methodToInvoke);
} finally {
exchange.put(LAST_SERVICE_OBJECT, resourceObject);
if (contextLoader != null) {
@@ -270,6 +267,33 @@ public class JAXRSInvoker extends Abstra
return result;
}
+
+ private Object handleFault(Fault ex, Message inMessage,
+ ClassResourceInfo cri, Method methodToInvoke) {
+ String errorMessage = ex.getCause().getMessage();
+ if (errorMessage != null && cri != null
+ && errorMessage.contains(PROXY_INVOCATION_ERROR_FRAGMENT)) {
+ org.apache.cxf.common.i18n.Message errorM =
+ new org.apache.cxf.common.i18n.Message("PROXY_INVOCATION_FAILURE",
+ BUNDLE,
+ methodToInvoke,
+ cri.getServiceClass().getName());
+ LOG.severe(errorM.toString());
+ }
+ Response excResponse = JAXRSUtils.convertFaultToResponse(ex.getCause(), inMessage);
+ if (excResponse == null) {
+ ProviderFactory.getInstance(inMessage).clearThreadLocalProxies();
+ ClassResourceInfo criRoot =
+ (ClassResourceInfo)inMessage.getExchange().get(JAXRSUtils.ROOT_RESOURCE_CLASS);
+ if (criRoot != null) {
+ criRoot.clearThreadLocalProxies();
+ }
+ inMessage.getExchange().put(Message.PROPOGATE_EXCEPTION,
+ JAXRSUtils.propogateException(inMessage));
+ throw ex;
+ }
+ return new MessageContentsList(excResponse);
+ }
@SuppressWarnings("unchecked")
protected MultivaluedMap<String, String> getTemplateValues(Message msg) {
Added: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java?rev=1391236&view=auto
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java (added)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java Thu Sep 27 22:08:38 2012
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.impl;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.message.Message;
+
+
+public class AsyncResponseImpl implements AsyncResponse {
+
+ private Continuation cont;
+ private Object responseObject;
+ private long timeout = 5000;
+ private Message inMessage;
+ private boolean suspended;
+ private boolean cancelled;
+ public AsyncResponseImpl(Message inMessage) {
+ ContinuationProvider provider =
+ (ContinuationProvider)inMessage.get(ContinuationProvider.class.getName());
+ cont = provider.getContinuation();
+ inMessage.put(AsyncResponse.class, this);
+ this.inMessage = inMessage;
+
+ }
+
+ @Override
+ public void resume(Object response) throws IllegalStateException {
+ doResume(response);
+ }
+
+ @Override
+ public void resume(Throwable response) throws IllegalStateException {
+ doResume(response);
+ }
+
+ private void doResume(Object response) throws IllegalStateException {
+ responseObject = response;
+ inMessage.getExchange().put(AsyncResponse.class, this);
+ suspended = false;
+ cont.resume();
+ }
+
+ @Override
+ public void cancel() {
+ cancel(-1);
+ }
+
+ @Override
+ //TODO: has to be long
+ public void cancel(int retryAfter) {
+ cancelled = true;
+ doResume(Response.status(503).header(HttpHeaders.RETRY_AFTER, Integer.toString(retryAfter)).build());
+ }
+
+ @Override
+ public void cancel(Date retryAfter) {
+ cancel((int)(retryAfter.getTime() - new Date().getTime()));
+ }
+
+ @Override
+ public boolean isSuspended() {
+ return suspended;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ @Override
+ public boolean isDone() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void setTimeout(long time, TimeUnit unit) throws IllegalStateException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void setTimeoutHandler(TimeoutHandler handler) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean register(Class<?> callback) throws NullPointerException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean[] register(Class<?> callback, Class<?>... callbacks) throws NullPointerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean register(Object callback) throws NullPointerException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean[] register(Object callback, Object... callbacks) throws NullPointerException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ // these methods are called by the runtime, not part of AsyncResponse
+ public void suspend() {
+ cont.setObject(this);
+ cont.suspend(timeout);
+ }
+
+ public Object getResponseObject() {
+ return responseObject;
+ }
+}
Propchange: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/AsyncResponseImpl.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java?rev=1391236&r1=1391235&r2=1391236&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/interceptor/JAXRSOutInterceptor.java Thu Sep 27 22:08:38 2012
@@ -35,6 +35,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
@@ -164,11 +165,12 @@ public class JAXRSOutInterceptor extends
Response response,
OperationResourceInfo ori,
boolean firstTry) {
+ final Exchange exchange = message.getExchange();
int status = response.getStatus();
Object responseObj = response.getEntity();
if (status == 200 && !isResponseNull(responseObj) && firstTry
&& ori != null && JAXRSUtils.headMethodPossible(ori.getHttpMethod(),
- (String)message.getExchange().getInMessage().get(Message.HTTP_REQUEST_METHOD))) {
+ (String)exchange.getInMessage().get(Message.HTTP_REQUEST_METHOD))) {
LOG.info(new org.apache.cxf.common.i18n.Message("HEAD_WITHOUT_ENTITY", BUNDLE).toString());
responseObj = null;
}
@@ -200,7 +202,7 @@ public class JAXRSOutInterceptor extends
return;
}
- Object ignoreWritersProp = message.getExchange().get(JAXRSUtils.IGNORE_MESSAGE_WRITERS);
+ Object ignoreWritersProp = exchange.get(JAXRSUtils.IGNORE_MESSAGE_WRITERS);
boolean ignoreWriters =
ignoreWritersProp == null ? false : Boolean.valueOf(ignoreWritersProp.toString());
if (ignoreWriters) {
@@ -215,9 +217,9 @@ public class JAXRSOutInterceptor extends
invoked = ori == null ? null : ori.getAnnotatedMethod() == null
? ori.getMethodToInvoke() : ori.getAnnotatedMethod();
}
-
- Class<?> targetType = getRawResponseClass(invoked, responseObj);
- Type genericType = getGenericResponseType(invoked, responseObj, targetType);
+ boolean asyncResponse = exchange.get(AsyncResponse.class) != null;
+ Class<?> targetType = getRawResponseClass(invoked, responseObj, asyncResponse);
+ Type genericType = getGenericResponseType(invoked, responseObj, targetType, asyncResponse);
if (genericType instanceof TypeVariable) {
genericType = InjectionUtils.getSuperType(ori.getClassResourceInfo().getServiceClass(),
(TypeVariable<?>)genericType);
@@ -413,21 +415,22 @@ public class JAXRSOutInterceptor extends
}
- private Class<?> getRawResponseClass(Method invoked, Object targetObject) {
+ private Class<?> getRawResponseClass(Method invoked, Object targetObject, boolean async) {
if (GenericEntity.class.isAssignableFrom(targetObject.getClass())) {
return ((GenericEntity<?>)targetObject).getRawType();
} else {
Class<?> targetClass = targetObject.getClass();
- Class<?> responseClass = invoked == null
+ Class<?> responseClass = async || invoked == null
|| !invoked.getReturnType().isAssignableFrom(targetClass) ? targetClass : invoked.getReturnType();
return ClassHelper.getRealClassFromClass(responseClass);
}
}
- private Type getGenericResponseType(Method invoked, Object targetObject, Class<?> targetType) {
+ private Type getGenericResponseType(Method invoked, Object targetObject, Class<?> targetType,
+ boolean async) {
if (GenericEntity.class.isAssignableFrom(targetObject.getClass())) {
return ((GenericEntity<?>)targetObject).getType();
- } else if (invoked == null || !invoked.getReturnType().isAssignableFrom(targetType)) {
+ } else if (async || invoked == null || !invoked.getReturnType().isAssignableFrom(targetType)) {
// when a method has been invoked it is still possible that either an ExceptionMapper
// or a ResponseHandler filter overrides a response entity; if it happens then
// the Type is the class of the response object, unless this new entity is assignable
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java?rev=1391236&r1=1391235&r2=1391236&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java Thu Sep 27 22:08:38 2012
@@ -63,6 +63,7 @@ import javax.ws.rs.RedirectionException;
import javax.ws.rs.ServerErrorException;
import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.ContainerResponseContext;
@@ -101,6 +102,7 @@ import org.apache.cxf.jaxrs.ext.MessageC
import org.apache.cxf.jaxrs.ext.ProtocolHeaders;
import org.apache.cxf.jaxrs.ext.ProtocolHeadersImpl;
import org.apache.cxf.jaxrs.ext.multipart.MultipartBody;
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
import org.apache.cxf.jaxrs.impl.ContainerRequestContextImpl;
import org.apache.cxf.jaxrs.impl.ContainerResponseContextImpl;
import org.apache.cxf.jaxrs.impl.HttpHeadersImpl;
@@ -644,6 +646,10 @@ public final class JAXRSUtils {
if (parameter.getType() == ParameterType.REQUEST_BODY) {
+ if (parameterClass == AsyncResponse.class) {
+ return new AsyncResponseImpl(message);
+ }
+
String contentType = (String)message.get(Message.CONTENT_TYPE);
if (contentType == null) {
Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java?rev=1391236&r1=1391235&r2=1391236&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/BookContinuationStore.java Thu Sep 27 22:08:38 2012
@@ -27,37 +27,26 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Resource;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
-
-import org.apache.cxf.continuations.Continuation;
-import org.apache.cxf.continuations.ContinuationProvider;
-import org.apache.cxf.jaxrs.ext.MessageContext;
+import javax.ws.rs.container.AsyncResponse;
@Path("/bookstore")
public class BookContinuationStore {
private Map<String, String> books = new HashMap<String, String>();
- private Map<String, Continuation> suspended =
- new HashMap<String, Continuation>();
private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
- @Resource
- private MessageContext context;
-
public BookContinuationStore() {
init();
}
@GET
@Path("/books/{id}")
- public String getBookDescription(@PathParam("id") String id) {
-
- return handleContinuationRequest(id);
-
+ public void getBookDescription(@PathParam("id") String id, AsyncResponse async) {
+ handleContinuationRequest(id, async);
}
@Path("/books/subresources/")
@@ -69,78 +58,25 @@ public class BookContinuationStore {
@GET
@Path("{id}")
- public String handleContinuationRequest(@PathParam("id") String id) {
- Continuation continuation = getContinuation(id);
- if (continuation == null) {
- throw new RuntimeException("Failed to get continuation");
- }
- synchronized (continuation) {
- if (continuation.isNew()) {
- continuation.setObject(id);
- suspendInvocation(id, continuation);
- } else {
- String savedId = continuation.getObject().toString();
- if (!savedId.equals(id)) {
- throw new RuntimeException("SavedId is wrong");
- }
- return books.get(savedId);
- }
- }
- // unreachable
- return null;
+ public void handleContinuationRequest(@PathParam("id") String id, AsyncResponse response) {
+ resumeSuspended(id, response);
}
- private void resumeRequest(final String name) {
-
- Continuation suspendedCont = null;
- synchronized (suspended) {
- suspendedCont = suspended.get(name);
- }
-
- if (suspendedCont != null) {
- synchronized (suspendedCont) {
- suspendedCont.resume();
- }
- }
- }
- private void suspendInvocation(final String name, Continuation cont) {
-
- //System.out.println("Suspending invocation for " + name);
-
- try {
- cont.suspend(500000);
- } finally {
- synchronized (suspended) {
- suspended.put(name, cont);
- }
- executor.execute(new Runnable() {
- public void run() {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ex) {
- // ignore
- }
- resumeRequest(name);
- }
- });
- }
- }
- private Continuation getContinuation(String name) {
-
- //System.out.println("Getting continuation for " + name);
+ private void resumeSuspended(final String id, final AsyncResponse response) {
- synchronized (suspended) {
- Continuation suspendedCont = suspended.remove(name);
- if (suspendedCont != null) {
- return suspendedCont;
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ response.resume(books.get(id));
}
- }
+ });
- ContinuationProvider provider =
- (ContinuationProvider)context.get(ContinuationProvider.class.getName());
- return provider.getContinuation();
}
private void init() {