You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@cxf.apache.org by "iris ding (JIRA)" <ji...@apache.org> on 2015/05/05 08:33:05 UTC
[jira] [Commented] (CXF-6389) set initialSuspend=true incorrectly
when resume the asyncresponse
[ https://issues.apache.org/jira/browse/CXF-6389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14528015#comment-14528015 ]
iris ding commented on CXF-6389:
--------------------------------
My client:
public void cancelVoidOnResumedTest()throws Exception {
String expectedResponse = "Expected response";
Future<Response> suspend = invokeRequest("suspend");
Future<Response> resume = invokeRequest("resume?stage=0",
expectedResponse);
assertString(resume, Resource.TRUE);
assertString(suspend, expectedResponse);
Future<Response> cancel = invokeRequest("cancelvoid?stage=1");
assertString(cancel, Resource.FALSE);
}
private static void assertString(Future<Response> future, String check)
throws Exception {
Response response = getResponse(future);
assertStatus(response, Status.OK);
String content = response.readEntity(String.class);
assertEquals(check, content, "Unexpected response content", content);
logMsg("Found expected string", check);
}
private <T> Future<Response> invokeRequest(String resource, T entity) {
AsyncInvoker async = createAsyncInvoker(resource);
Future<Response> future = async.post(Entity.entity(entity,
MediaType.TEXT_PLAIN_TYPE));
return future;
}
private Future<Response> invokeRequest(String resource) {
AsyncInvoker async = createAsyncInvoker(resource);
Future<Response> future = async.get();
return future;
}
private AsyncInvoker createAsyncInvoker(String resource) {
WebTarget target = createWebTarget(resource);
AsyncInvoker async = target.request().async();
return async;
}
// The test is as below:
//step1- call below resource method:
@Path("suspend")
public void suspend(@Suspended AsyncResponse asyncResponse)
{
stage[0].add(asyncResponse);
}
//step2: resume the asyncresponse:
@POST
@Path("resume")
public String resume(@QueryParam("stage") String stage, String response)
{
AsyncResponse async = takeAsyncResponse(stage);
boolean b = resume(async, response);
addResponse(async, stage);
return b ? TRUE : FALSE;
}
//step3: cancel the asyncresponse, in this case, since the asyncresponse has already been resumed, the cancel should return 'false":
@GET
@Path("cancelvoid")
public String cancel(@QueryParam("stage") String stage)
{
AsyncResponse response = takeAsyncResponse(stage);
boolean ret = response.cancel();
System.out.println("*** response.cancel() 1 " + ret);
ret &= response.cancel();
System.out.println("*** response.cancel() 2 " + ret);
addResponse(response, stage);
return ret ? TRUE : FALSE;
}
But currently, CXF returns true. The issue here is in :
org.apache.cxf.jaxrs.JAXRSInvoker.invoke(Exchange exchange, Object request) -> asyncImpl.prepareContinuation()->asyncImpl.initContinuation().
in asyncImpl.initContinuation() it will set initialSuspend = true;
The proposed change is modify org.apache.cxf.jaxrs.impl.AsyncResponseImpl.prepareContinuation(), just remove the "initialSuspend = true" in initContinuation().
public void prepareContinuation() {
// initContinuation();
ContinuationProvider provider =
(ContinuationProvider) inMessage.get(ContinuationProvider.class.getName());
cont = provider.getContinuation();
}
> set initialSuspend=true incorrectly when resume the asyncresponse
> -----------------------------------------------------------------
>
> Key: CXF-6389
> URL: https://issues.apache.org/jira/browse/CXF-6389
> Project: CXF
> Issue Type: Bug
> Components: JAX-RS
> Affects Versions: 3.0.0, 3.0.3, 2.7.15
> Reporter: iris ding
>
> My Resource class:
> @Path("resource")
> public class Resource
> {
> public static final String RESUMED = "Response resumed";
> public static final String FALSE = "A method returned false";
> public static final String TRUE = "A method return true";
> //
> private static final AsyncResponseBlockingQueue[] stage = {
> new AsyncResponseBlockingQueue(1),
> new AsyncResponseBlockingQueue(1),
> new AsyncResponseBlockingQueue(1)};
> @GET
> @Path("suspend")
> public void suspend(@Suspended AsyncResponse asyncResponse)
> {
> stage[0].add(asyncResponse);
> }
>
> @GET
> @Path("cancelvoid")
> public String cancel(@QueryParam("stage") String stage)
> {
> AsyncResponse response = takeAsyncResponse(stage);
> boolean ret = response.cancel();
> System.out.println("*** response.cancel() 1 " + ret);
> ret &= response.cancel();
> System.out.println("*** response.cancel() 2 " + ret);
> addResponse(response, stage);
> return ret ? TRUE : FALSE;
> }
>
> @POST
> @Path("resume")
> public String resume(@QueryParam("stage") String stage, String response)
> {
> AsyncResponse async = takeAsyncResponse(stage);
> boolean b = resume(async, response);
> addResponse(async, stage);
> return b ? TRUE : FALSE;
> }
>
> protected static AsyncResponse takeAsyncResponse(String stageId)
> {
> return takeAsyncResponse(Integer.parseInt(stageId));
> }
> protected static AsyncResponse takeAsyncResponse(int stageId)
> {
> final ResponseBuilder error = createErrorResponseBuilder();
> AsyncResponse asyncResponse = null;
> try
> {
> asyncResponse = stage[stageId].take();
> }
> catch (InterruptedException e)
> {
> throw new WebApplicationException(error.entity(
> "ArrayBlockingQueue#take").build());
> }
> return asyncResponse;
> }
> protected static final void addResponse(AsyncResponse response, String stageId)
> {
> int id = Integer.parseInt(stageId) + 1;
> if (id != stage.length)
> stage[id].add(response);
> }
> protected static boolean resume(AsyncResponse takenResponse, Object response)
> {
> return takenResponse.resume(response);
> }
> protected static ResponseBuilder createErrorResponseBuilder()
> {
> return Response.status(Status.EXPECTATION_FAILED);
> }
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)