You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/09/21 20:18:36 UTC

svn commit: r1388597 - in /cxf/trunk: rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/ rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/ rt/transports/http/src/main/java/org/apache/cxf/transport/http/ systests/jaxrs/src/test...

Author: dkulp
Date: Fri Sep 21 18:18:35 2012
New Revision: 1388597

URL: http://svn.apache.org/viewvc?rev=1388597&view=rev
Log:
Add first pass at some Async methods on the JAX-RS WebClient (ok, just an async get so far, but have to start someplace)

Added:
    cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
Modified:
    cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
    cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientResponseFilterInterceptor.java
    cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
    cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java

Added: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java?rev=1388597&view=auto
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java (added)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java Fri Sep 21 18:18:35 2012
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.client;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.ClientException;
+import javax.ws.rs.client.InvocationCallback;
+
+import org.apache.cxf.endpoint.ClientCallback;
+
+class JaxrsClientCallback<T> extends ClientCallback {
+    private final InvocationCallback<T> handler;
+    private final Type outType;
+    private final Class<?> responseClass;
+    
+    public JaxrsClientCallback(final InvocationCallback<T> handler, 
+                               Class<?> responseClass, 
+                               Type outGenericType) {
+        this.handler = handler;
+        this.outType = outGenericType;
+        this.responseClass = responseClass;
+    }
+    
+    public Type getOutGenericType() {
+        return outType;
+    }
+    public Class<?> getResponseClass() {
+        return responseClass;
+    }
+    
+    public Future<T> createFuture() {
+        return new JaxrsResponseCallback<T>(this);
+    }
+    static class JaxrsResponseCallback<T> implements Future<T> {
+        JaxrsClientCallback<T> callback;
+        public JaxrsResponseCallback(JaxrsClientCallback<T> cb) {
+            callback = cb;
+        }
+        
+        public Map<String, Object> getContext() {
+            try {
+                return callback.getResponseContext();
+            } catch (Exception ex) {
+                return null;
+            }
+        }
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return callback.cancel(mayInterruptIfRunning);
+        }
+        @SuppressWarnings("unchecked")
+        public T get() throws InterruptedException, ExecutionException {
+            return (T)callback.get()[0];
+        }
+        @SuppressWarnings("unchecked")
+        public T get(long timeout, TimeUnit unit) throws InterruptedException,
+            ExecutionException, TimeoutException {
+            return (T)callback.get(timeout, unit)[0];
+        }
+        public boolean isCancelled() {
+            return callback.isCancelled();
+        }
+        public boolean isDone() {
+            return callback.isDone();
+        }
+    }
+    
+    
+    @SuppressWarnings("unchecked")
+    public void handleResponse(Map<String, Object> ctx, Object[] res) {
+        context = ctx;
+        result = res;
+        if (handler != null) {
+            handler.completed((T)res[0]);
+        }
+        done = true;
+        synchronized (this) {
+            notifyAll();
+        }
+    }
+
+    @Override
+    public void handleException(Map<String, Object> ctx, final Throwable ex) {
+        context = ctx;
+        if (ex instanceof ClientException) {
+            exception = ex;
+        } else {
+            exception = new ClientException(ex);
+        }
+        if (handler != null) {
+            handler.failed((ClientException)exception);
+        }
+        done = true;
+        synchronized (this) {
+            notifyAll();
+        }
+    }
+}
\ No newline at end of file

Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java?rev=1388597&r1=1388596&r2=1388597&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java Fri Sep 21 18:18:35 2012
@@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.client;
 
 import java.io.OutputStream;
 import java.lang.annotation.Annotation;
+import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.net.URI;
 import java.util.Arrays;
@@ -28,9 +29,11 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.client.ClientException;
+import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.core.Cookie;
 import javax.ws.rs.core.EntityTag;
 import javax.ws.rs.core.HttpHeaders;
@@ -56,6 +59,8 @@ import org.apache.cxf.jaxrs.utils.JAXRSU
 import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
 
 
 /**
@@ -74,10 +79,36 @@ public class WebClient extends AbstractC
     
     protected WebClient(URI baseAddress) {
         super(baseAddress);
+        cfg.getInInterceptors().add(new ClientAsyncResponseInterceptor());
     }
     
     protected WebClient(ClientState state) {
         super(state);
+        cfg.getInInterceptors().add(new ClientAsyncResponseInterceptor());
+    }
+    
+    
+    class ClientAsyncResponseInterceptor extends AbstractPhaseInterceptor<Message> {
+        public ClientAsyncResponseInterceptor() {
+            super(Phase.UNMARSHAL);
+        }
+
+        @Override
+        public void handleMessage(Message message) throws Fault {
+            if (message.getExchange().isSynchronous()) {
+                return;
+            }
+            handleAsyncResponse(message);
+        }
+
+        @Override
+        public void handleFault(Message message) {
+            if (message.getExchange().isSynchronous()) {
+                return;
+            }
+            handleAsyncFault(message);
+        }
+
     }
     
     /**
@@ -468,6 +499,10 @@ public class WebClient extends AbstractC
         return invoke("GET", null, responseClass);
     }
     
+    public <T> Future<T> get(InvocationCallback<T> callback) {
+        return doInvokeAsync("GET", null, null, null, callback);
+    }
+    
     /**
      * Updates the current URI path
      * @param path new relative path segment
@@ -746,7 +781,100 @@ public class WebClient extends AbstractC
         }
         return r;
     }
+    
+    private ParameterizedType findCallbackType(Class<?> cls) {
+        if (cls == null || cls == Object.class) {
+            return null;
+        }
+        for (Type c2 : cls.getGenericInterfaces()) {
+            if (c2 instanceof ParameterizedType) {
+                ParameterizedType pt = (ParameterizedType)c2;
+                if (InvocationCallback.class.equals(pt.getRawType())) {
+                    return pt;
+                }
+            }
+        }
+        return findCallbackType(cls.getSuperclass());
+    }
+    private Type getCallbackType(InvocationCallback<?> callback) {
+        Class<?> cls = callback.getClass();
+        ParameterizedType t = findCallbackType(cls);
+        for (Type tp : t.getActualTypeArguments()) {
+            return tp;
+        }
+        return null;
+    }
+    protected <T> Future<T> doInvokeAsync(String httpMethod, 
+                                          Object body, 
+                                          Class<?> requestClass,
+                                          Type inGenericType,
+                                          InvocationCallback<T> callback) {
+        MultivaluedMap<String, String> headers = getHeaders();
+        boolean contentTypeNotSet = headers.getFirst(HttpHeaders.CONTENT_TYPE) == null;
+        if (contentTypeNotSet) {
+            String ct = "*/*";
+            if (body != null) { 
+                ct = body instanceof Form ? MediaType.APPLICATION_FORM_URLENCODED 
+                                          : MediaType.APPLICATION_XML;
+            }
+            headers.putSingle(HttpHeaders.CONTENT_TYPE, ct);
+        }
+        Type outGenericType = getCallbackType(callback);
+        Class<?> responseClass = outGenericType instanceof Class ? (Class<?>) outGenericType : null;
+        if (responseClass != null && responseClass != Response.class 
+            && headers.getFirst(HttpHeaders.ACCEPT) == null) {
+            headers.putSingle(HttpHeaders.ACCEPT, MediaType.APPLICATION_XML_TYPE.toString());
+        }
+        resetResponse();
+        URI uri = getCurrentURI();
+        Exchange exchange = null;
+        Map<String, Object> invContext = null;
+        
+        Message m = createMessage(body, httpMethod, headers, uri, exchange, 
+                invContext, false);
+        
+        m.getExchange().setSynchronous(false);
+        
+        Map<String, Object> reqContext = getRequestContext(m);
+        reqContext.put(Message.HTTP_REQUEST_METHOD, httpMethod);
+        reqContext.put(REQUEST_CLASS, requestClass);
+        reqContext.put(REQUEST_TYPE, inGenericType);
+        reqContext.put(RESPONSE_CLASS, responseClass);
+        reqContext.put(RESPONSE_TYPE, outGenericType);
+        
+        if (body != null) {
+            m.getInterceptorChain().add(new BodyWriter());
+        }
+        setPlainOperationNameProperty(m, httpMethod + ":" + uri.toString());
+        
+        JaxrsClientCallback<T> cb = new JaxrsClientCallback<T>(callback, responseClass, outGenericType);
+        m.getExchange().put(JaxrsClientCallback.class, cb);
+        try {
+            m.getInterceptorChain().doIntercept(m);
+        } catch (Exception ex) {
+            m.setContent(Exception.class, ex);
+        }
+        
+        return cb.createFuture();
+    }
 
+    private void handleAsyncResponse(Message message) {
+        JaxrsClientCallback<?> cb = message.getExchange().get(JaxrsClientCallback.class);
+        Response r = handleResponse(message.getExchange().getOutMessage(),
+                                    cb.getResponseClass(),
+                                    cb.getOutGenericType());
+        
+        if (cb.getResponseClass() == null || Response.class.equals(cb.getResponseClass())) {
+            cb.handleResponse(message, new Object[] {r});
+        } else {
+            cb.handleResponse(message, new Object[] {r.getEntity()});
+        }
+    }
+    public void handleAsyncFault(Message message) {
+    }
+
+
+    
     @Override
     protected Object retryInvoke(URI newRequestURI, 
                                  MultivaluedMap<String, String> headers,
@@ -795,6 +923,11 @@ public class WebClient extends AbstractC
         } catch (Exception ex) {
             m.setContent(Exception.class, ex);
         }
+        return doResponse(m, responseClass, outGenericType);
+    }
+    protected Response doResponse(Message m, 
+                                  Class<?> responseClass, 
+                                  Type outGenericType) {
         try {
             Object[] results = preProcessResult(m);
             if (results != null && results.length == 1) {

Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientResponseFilterInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientResponseFilterInterceptor.java?rev=1388597&r1=1388596&r2=1388597&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientResponseFilterInterceptor.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/spec/ClientResponseFilterInterceptor.java Fri Sep 21 18:18:35 2012
@@ -41,7 +41,7 @@ import org.apache.cxf.phase.Phase;
 public class ClientResponseFilterInterceptor extends AbstractInDatabindingInterceptor {
 
     public ClientResponseFilterInterceptor() {
-        super(Phase.UNMARSHAL);
+        super(Phase.PRE_PROTOCOL_FRONTEND);
     }
     
     public void handleMessage(Message inMessage) throws Fault {

Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=1388597&r1=1388596&r2=1388597&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Fri Sep 21 18:18:35 2012
@@ -1124,7 +1124,11 @@ public abstract class HTTPConduit 
                         ((PhaseInterceptorChain)outMessage.getInterceptorChain()).abort();
                         ((PhaseInterceptorChain)outMessage.getInterceptorChain()).unwind(outMessage);
                         outMessage.setContent(Exception.class, e);
-                        outMessage.getInterceptorChain().getFaultObserver().onMessage(outMessage);
+                        MessageObserver mo = outMessage.getInterceptorChain().getFaultObserver();
+                        if (mo == null) {
+                            mo = outMessage.getExchange().get(MessageObserver.class);
+                        }
+                        mo.onMessage(outMessage);
                     }
                 }
             };

Modified: cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java?rev=1388597&r1=1388596&r2=1388597&view=diff
==============================================================================
--- cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java (original)
+++ cxf/trunk/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRS20ClientServerBookTest.java Fri Sep 21 18:18:35 2012
@@ -22,18 +22,23 @@ package org.apache.cxf.systest.jaxrs;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.client.ClientException;
 import javax.ws.rs.client.ClientRequestContext;
 import javax.ws.rs.client.ClientRequestFilter;
 import javax.ws.rs.client.ClientResponseContext;
 import javax.ws.rs.client.ClientResponseFilter;
+import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.ReaderInterceptor;
 import javax.ws.rs.ext.ReaderInterceptorContext;
 import javax.ws.rs.ext.WriterInterceptor;
 import javax.ws.rs.ext.WriterInterceptorContext;
+import javax.xml.ws.Holder;
 
 import org.apache.cxf.jaxrs.client.WebClient;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
@@ -55,12 +60,22 @@ public class JAXRS20ClientServerBookTest
         String address = "http://localhost:" + PORT + "/bookstore/bookheaders/simple";
         doTestBook(address);
     }
+    @Test
+    public void testGetBookAsync() throws Exception {
+        String address = "http://localhost:" + PORT + "/bookstore/bookheaders/simple";
+        doTestBookAsync(address);
+    }
     
     @Test
     public void testGetBookWrongPath() {
         String address = "http://localhost:" + PORT + "/wrongpath";
         doTestBook(address);
     }
+    @Test
+    public void testGetBookWrongPathAsync() throws Exception {
+        String address = "http://localhost:" + PORT + "/wrongpath";
+        doTestBookAsync(address);
+    }
     
     private void doTestBook(String address) {
         List<Object> providers = new ArrayList<Object>();
@@ -77,6 +92,31 @@ public class JAXRS20ClientServerBookTest
         assertEquals("serverWrite", response.getHeaderString("ServerWriterInterceptor"));
         assertEquals("http://localhost/redirect", response.getHeaderString(HttpHeaders.LOCATION));
     }
+    private void doTestBookAsync(String address) throws InterruptedException, ExecutionException {
+        List<Object> providers = new ArrayList<Object>();
+        providers.add(new ClientHeaderRequestFilter());
+        providers.add(new ClientHeaderResponseFilter());
+        WebClient wc = WebClient.create(address, providers);
+        WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(1000000L);
+        
+        final Holder<Book> holder = new Holder<Book>();
+        Future<Book> future = wc.get(new InvocationCallback<Book>() {
+            public void completed(Book response) {
+                holder.value = response;
+            }
+            public void failed(ClientException error) {
+            }
+        });
+        Book book = future.get();
+        assertSame(book, holder.value);
+        assertEquals(124L, book.getId());
+        Response response = wc.getResponse();
+        assertEquals("OK", response.getHeaderString("Response"));
+        assertEquals("custom", response.getHeaderString("Custom"));
+        assertEquals("simple", response.getHeaderString("Simple"));
+        assertEquals("serverWrite", response.getHeaderString("ServerWriterInterceptor"));
+        assertEquals("http://localhost/redirect", response.getHeaderString(HttpHeaders.LOCATION));
+    }
     
     @Test
     public void testClientFiltersLocalResponse() {