You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/09/21 20:18:36 UTC
svn commit: r1388597 - in /cxf/trunk:
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/
rt/transports/http/src/main/java/org/apache/cxf/transport/http/
systests/jaxrs/src/test...
Author: dkulp
Date: Fri Sep 21 18:18:35 2012
New Revision: 1388597
URL: http://svn.apache.org/viewvc?rev=1388597&view=rev
Log:
Add first pass at some Async methods on the JAX-RS WebClient (ok, just an async get so far, but have to start someplace)
Added:
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
Modified:
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientResponseFilterInterceptor.java
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java
Added: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java?rev=1388597&view=auto
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java (added)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java Fri Sep 21 18:18:35 2012
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.client;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.ClientException;
+import javax.ws.rs.client.InvocationCallback;
+
+import org.apache.cxf.endpoint.ClientCallback;
+
+class JaxrsClientCallback<T> extends ClientCallback {
+ private final InvocationCallback<T> handler;
+ private final Type outType;
+ private final Class<?> responseClass;
+
+ public JaxrsClientCallback(final InvocationCallback<T> handler,
+ Class<?> responseClass,
+ Type outGenericType) {
+ this.handler = handler;
+ this.outType = outGenericType;
+ this.responseClass = responseClass;
+ }
+
+ public Type getOutGenericType() {
+ return outType;
+ }
+ public Class<?> getResponseClass() {
+ return responseClass;
+ }
+
+ public Future<T> createFuture() {
+ return new JaxrsResponseCallback<T>(this);
+ }
+ static class JaxrsResponseCallback<T> implements Future<T> {
+ JaxrsClientCallback<T> callback;
+ public JaxrsResponseCallback(JaxrsClientCallback<T> cb) {
+ callback = cb;
+ }
+
+ public Map<String, Object> getContext() {
+ try {
+ return callback.getResponseContext();
+ } catch (Exception ex) {
+ return null;
+ }
+ }
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return callback.cancel(mayInterruptIfRunning);
+ }
+ @SuppressWarnings("unchecked")
+ public T get() throws InterruptedException, ExecutionException {
+ return (T)callback.get()[0];
+ }
+ @SuppressWarnings("unchecked")
+ public T get(long timeout, TimeUnit unit) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ return (T)callback.get(timeout, unit)[0];
+ }
+ public boolean isCancelled() {
+ return callback.isCancelled();
+ }
+ public boolean isDone() {
+ return callback.isDone();
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public void handleResponse(Map<String, Object> ctx, Object[] res) {
+ context = ctx;
+ result = res;
+ if (handler != null) {
+ handler.completed((T)res[0]);
+ }
+ done = true;
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
+ @Override
+ public void handleException(Map<String, Object> ctx, final Throwable ex) {
+ context = ctx;
+ if (ex instanceof ClientException) {
+ exception = ex;
+ } else {
+ exception = new ClientException(ex);
+ }
+ if (handler != null) {
+ handler.failed((ClientException)exception);
+ }
+ done = true;
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+}
\ No newline at end of file
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java?rev=1388597&r1=1388596&r2=1388597&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java Fri Sep 21 18:18:35 2012
@@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.client;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
+import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.Arrays;
@@ -28,9 +29,11 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Future;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.client.ClientException;
+import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.core.Cookie;
import javax.ws.rs.core.EntityTag;
import javax.ws.rs.core.HttpHeaders;
@@ -56,6 +59,8 @@ import org.apache.cxf.jaxrs.utils.JAXRSU
import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
/**
@@ -74,10 +79,36 @@ public class WebClient extends AbstractC
protected WebClient(URI baseAddress) {
super(baseAddress);
+ cfg.getInInterceptors().add(new ClientAsyncResponseInterceptor());
}
protected WebClient(ClientState state) {
super(state);
+ cfg.getInInterceptors().add(new ClientAsyncResponseInterceptor());
+ }
+
+
+ class ClientAsyncResponseInterceptor extends AbstractPhaseInterceptor<Message> {
+ public ClientAsyncResponseInterceptor() {
+ super(Phase.UNMARSHAL);
+ }
+
+ @Override
+ public void handleMessage(Message message) throws Fault {
+ if (message.getExchange().isSynchronous()) {
+ return;
+ }
+ handleAsyncResponse(message);
+ }
+
+ @Override
+ public void handleFault(Message message) {
+ if (message.getExchange().isSynchronous()) {
+ return;
+ }
+ handleAsyncFault(message);
+ }
+
}
/**
@@ -468,6 +499,10 @@ public class WebClient extends AbstractC
return invoke("GET", null, responseClass);
}
+ public <T> Future<T> get(InvocationCallback<T> callback) {
+ return doInvokeAsync("GET", null, null, null, callback);
+ }
+
/**
* Updates the current URI path
* @param path new relative path segment
@@ -746,7 +781,100 @@ public class WebClient extends AbstractC
}
return r;
}
+
+ private ParameterizedType findCallbackType(Class<?> cls) {
+ if (cls == null || cls == Object.class) {
+ return null;
+ }
+ for (Type c2 : cls.getGenericInterfaces()) {
+ if (c2 instanceof ParameterizedType) {
+ ParameterizedType pt = (ParameterizedType)c2;
+ if (InvocationCallback.class.equals(pt.getRawType())) {
+ return pt;
+ }
+ }
+ }
+ return findCallbackType(cls.getSuperclass());
+ }
+ private Type getCallbackType(InvocationCallback<?> callback) {
+ Class<?> cls = callback.getClass();
+ ParameterizedType t = findCallbackType(cls);
+ for (Type tp : t.getActualTypeArguments()) {
+ return tp;
+ }
+ return null;
+ }
+ protected <T> Future<T> doInvokeAsync(String httpMethod,
+ Object body,
+ Class<?> requestClass,
+ Type inGenericType,
+ InvocationCallback<T> callback) {
+ MultivaluedMap<String, String> headers = getHeaders();
+ boolean contentTypeNotSet = headers.getFirst(HttpHeaders.CONTENT_TYPE) == null;
+ if (contentTypeNotSet) {
+ String ct = "*/*";
+ if (body != null) {
+ ct = body instanceof Form ? MediaType.APPLICATION_FORM_URLENCODED
+ : MediaType.APPLICATION_XML;
+ }
+ headers.putSingle(HttpHeaders.CONTENT_TYPE, ct);
+ }
+ Type outGenericType = getCallbackType(callback);
+ Class<?> responseClass = outGenericType instanceof Class ? (Class<?>) outGenericType : null;
+ if (responseClass != null && responseClass != Response.class
+ && headers.getFirst(HttpHeaders.ACCEPT) == null) {
+ headers.putSingle(HttpHeaders.ACCEPT, MediaType.APPLICATION_XML_TYPE.toString());
+ }
+ resetResponse();
+ URI uri = getCurrentURI();
+ Exchange exchange = null;
+ Map<String, Object> invContext = null;
+
+ Message m = createMessage(body, httpMethod, headers, uri, exchange,
+ invContext, false);
+
+ m.getExchange().setSynchronous(false);
+
+ Map<String, Object> reqContext = getRequestContext(m);
+ reqContext.put(Message.HTTP_REQUEST_METHOD, httpMethod);
+ reqContext.put(REQUEST_CLASS, requestClass);
+ reqContext.put(REQUEST_TYPE, inGenericType);
+ reqContext.put(RESPONSE_CLASS, responseClass);
+ reqContext.put(RESPONSE_TYPE, outGenericType);
+
+ if (body != null) {
+ m.getInterceptorChain().add(new BodyWriter());
+ }
+ setPlainOperationNameProperty(m, httpMethod + ":" + uri.toString());
+
+ JaxrsClientCallback<T> cb = new JaxrsClientCallback<T>(callback, responseClass, outGenericType);
+ m.getExchange().put(JaxrsClientCallback.class, cb);
+ try {
+ m.getInterceptorChain().doIntercept(m);
+ } catch (Exception ex) {
+ m.setContent(Exception.class, ex);
+ }
+
+ return cb.createFuture();
+ }
+ private void handleAsyncResponse(Message message) {
+ JaxrsClientCallback<?> cb = message.getExchange().get(JaxrsClientCallback.class);
+ Response r = handleResponse(message.getExchange().getOutMessage(),
+ cb.getResponseClass(),
+ cb.getOutGenericType());
+
+ if (cb.getResponseClass() == null || Response.class.equals(cb.getResponseClass())) {
+ cb.handleResponse(message, new Object[] {r});
+ } else {
+ cb.handleResponse(message, new Object[] {r.getEntity()});
+ }
+ }
+ public void handleAsyncFault(Message message) {
+ }
+
+
+
@Override
protected Object retryInvoke(URI newRequestURI,
MultivaluedMap<String, String> headers,
@@ -795,6 +923,11 @@ public class WebClient extends AbstractC
} catch (Exception ex) {
m.setContent(Exception.class, ex);
}
+ return doResponse(m, responseClass, outGenericType);
+ }
+ protected Response doResponse(Message m,
+ Class<?> responseClass,
+ Type outGenericType) {
try {
Object[] results = preProcessResult(m);
if (results != null && results.length == 1) {
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientResponseFilterInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientResponseFilterInterceptor.java?rev=1388597&r1=1388596&r2=1388597&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientResponseFilterInterceptor.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientResponseFilterInterceptor.java Fri Sep 21 18:18:35 2012
@@ -41,7 +41,7 @@ import org.apache.cxf.phase.Phase;
public class ClientResponseFilterInterceptor extends AbstractInDatabindingInterceptor {
public ClientResponseFilterInterceptor() {
- super(Phase.UNMARSHAL);
+ super(Phase.PRE_PROTOCOL_FRONTEND);
}
public void handleMessage(Message inMessage) throws Fault {
Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=1388597&r1=1388596&r2=1388597&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Fri Sep 21 18:18:35 2012
@@ -1124,7 +1124,11 @@ public abstract class HTTPConduit
((PhaseInterceptorChain)outMessage.getInterceptorChain()).abort();
((PhaseInterceptorChain)outMessage.getInterceptorChain()).unwind(outMessage);
outMessage.setContent(Exception.class, e);
- outMessage.getInterceptorChain().getFaultObserver().onMessage(outMessage);
+ MessageObserver mo = outMessage.getInterceptorChain().getFaultObserver();
+ if (mo == null) {
+ mo = outMessage.getExchange().get(MessageObserver.class);
+ }
+ mo.onMessage(outMessage);
}
}
};
Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java?rev=1388597&r1=1388596&r2=1388597&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java Fri Sep 21 18:18:35 2012
@@ -22,18 +22,23 @@ package org.apache.cxf.systest.jaxrs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.client.ClientException;
import javax.ws.rs.client.ClientRequestContext;
import javax.ws.rs.client.ClientRequestFilter;
import javax.ws.rs.client.ClientResponseContext;
import javax.ws.rs.client.ClientResponseFilter;
+import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ReaderInterceptor;
import javax.ws.rs.ext.ReaderInterceptorContext;
import javax.ws.rs.ext.WriterInterceptor;
import javax.ws.rs.ext.WriterInterceptorContext;
+import javax.xml.ws.Holder;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
@@ -55,12 +60,22 @@ public class JAXRS20ClientServerBookTest
String address = "http://localhost:" + PORT + "/bookstore/bookheaders/simple";
doTestBook(address);
}
+ @Test
+ public void testGetBookAsync() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/bookheaders/simple";
+ doTestBookAsync(address);
+ }
@Test
public void testGetBookWrongPath() {
String address = "http://localhost:" + PORT + "/wrongpath";
doTestBook(address);
}
+ @Test
+ public void testGetBookWrongPathAsync() throws Exception {
+ String address = "http://localhost:" + PORT + "/wrongpath";
+ doTestBookAsync(address);
+ }
private void doTestBook(String address) {
List<Object> providers = new ArrayList<Object>();
@@ -77,6 +92,31 @@ public class JAXRS20ClientServerBookTest
assertEquals("serverWrite", response.getHeaderString("ServerWriterInterceptor"));
assertEquals("http://localhost/redirect", response.getHeaderString(HttpHeaders.LOCATION));
}
+ private void doTestBookAsync(String address) throws InterruptedException, ExecutionException {
+ List<Object> providers = new ArrayList<Object>();
+ providers.add(new ClientHeaderRequestFilter());
+ providers.add(new ClientHeaderResponseFilter());
+ WebClient wc = WebClient.create(address, providers);
+ WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(1000000L);
+
+ final Holder<Book> holder = new Holder<Book>();
+ Future<Book> future = wc.get(new InvocationCallback<Book>() {
+ public void completed(Book response) {
+ holder.value = response;
+ }
+ public void failed(ClientException error) {
+ }
+ });
+ Book book = future.get();
+ assertSame(book, holder.value);
+ assertEquals(124L, book.getId());
+ Response response = wc.getResponse();
+ assertEquals("OK", response.getHeaderString("Response"));
+ assertEquals("custom", response.getHeaderString("Custom"));
+ assertEquals("simple", response.getHeaderString("Simple"));
+ assertEquals("serverWrite", response.getHeaderString("ServerWriterInterceptor"));
+ assertEquals("http://localhost/redirect", response.getHeaderString(HttpHeaders.LOCATION));
+ }
@Test
public void testClientFiltersLocalResponse() {