You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2017/02/01 12:33:36 UTC
cxf git commit: Updating to m03 jaxrs 2.1
Repository: cxf
Updated Branches:
refs/heads/master 3bebf6b49 -> ea8b6e270
Updating to m03 jaxrs 2.1
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/ea8b6e27
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/ea8b6e27
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/ea8b6e27
Branch: refs/heads/master
Commit: ea8b6e270c3a4a62061c4269174b6326a57ea2b5
Parents: 3bebf6b
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Wed Feb 1 12:33:14 2017 +0000
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Wed Feb 1 12:33:14 2017 +0000
----------------------------------------------------------------------
parent/pom.xml | 2 +-
.../apache/cxf/jaxrs/client/AsyncClient.java | 2 +-
.../cxf/jaxrs/client/AsyncInvokerImpl.java | 218 +++++++++
.../cxf/jaxrs/client/ClientProviderFactory.java | 12 +-
.../client/CompletionStageRxInvokerImpl.java | 163 +++++++
.../cxf/jaxrs/client/SyncInvokerImpl.java | 161 +++++++
.../org/apache/cxf/jaxrs/client/WebClient.java | 473 +------------------
.../client/spec/InvocationBuilderImpl.java | 33 +-
.../rx/client/ObservableRxInvokerProvider.java | 16 +-
.../jaxrs/reactive/JAXRSReactiveTest.java | 11 +-
10 files changed, 600 insertions(+), 491 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 3427503..e399855 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -109,7 +109,7 @@
<cxf.geronimo.transaction.version>1.1.1</cxf.geronimo.transaction.version>
<cxf.jasypt.bundle.version>1.9.0_1</cxf.jasypt.bundle.version>
<cxf.javassist.version>3.19.0-GA</cxf.javassist.version>
- <cxf.javax.ws.rs.version>2.1-m02</cxf.javax.ws.rs.version>
+ <cxf.javax.ws.rs.version>2.1-m03</cxf.javax.ws.rs.version>
<cxf.jaxb.version>2.2.11</cxf.jaxb.version>
<cxf.jaxb.impl.version>${cxf.jaxb.version}</cxf.jaxb.impl.version>
<cxf.jaxb.core.version>${cxf.jaxb.version}</cxf.jaxb.core.version>
http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
index e81a090..0766718 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
@@ -20,7 +20,7 @@ package org.apache.cxf.jaxrs.client;
import java.lang.reflect.Type;
-//Work in progress
+//Work in progress. May be removed once the Rx client work is finalized
public interface AsyncClient {
void prepareAsyncClient(String httpMethod,
Object body,
http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
new file mode 100644
index 0000000..c51a969
--- /dev/null
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncInvokerImpl.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.client;
+
+import java.util.concurrent.Future;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.AsyncInvoker;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+public class AsyncInvokerImpl implements AsyncInvoker {
+
+ private WebClient wc;
+
+ public AsyncInvokerImpl(WebClient wc) {
+ this.wc = wc;
+ }
+ @Override
+ public Future<Response> get() {
+ return get(Response.class);
+ }
+
+ @Override
+ public <T> Future<T> get(Class<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public <T> Future<T> get(GenericType<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public <T> Future<T> get(InvocationCallback<T> callback) {
+ return method(HttpMethod.GET, callback);
+ }
+
+ @Override
+ public Future<Response> put(Entity<?> entity) {
+ return put(entity, Response.class);
+ }
+
+ @Override
+ public <T> Future<T> put(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public <T> Future<T> put(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public <T> Future<T> put(Entity<?> entity, InvocationCallback<T> callback) {
+ return method(HttpMethod.PUT, entity, callback);
+ }
+
+ @Override
+ public Future<Response> post(Entity<?> entity) {
+ return post(entity, Response.class);
+ }
+
+ @Override
+ public <T> Future<T> post(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public <T> Future<T> post(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public <T> Future<T> post(Entity<?> entity, InvocationCallback<T> callback) {
+ return method(HttpMethod.POST, entity, callback);
+ }
+
+ @Override
+ public Future<Response> delete() {
+ return delete(Response.class);
+ }
+
+ @Override
+ public <T> Future<T> delete(Class<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public <T> Future<T> delete(GenericType<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public <T> Future<T> delete(InvocationCallback<T> callback) {
+ return method(HttpMethod.DELETE, callback);
+ }
+
+ @Override
+ public Future<Response> head() {
+ return method(HttpMethod.HEAD);
+ }
+
+ @Override
+ public Future<Response> head(InvocationCallback<Response> callback) {
+ return method(HttpMethod.HEAD, callback);
+ }
+
+ @Override
+ public Future<Response> options() {
+ return options(Response.class);
+ }
+
+ @Override
+ public <T> Future<T> options(Class<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public <T> Future<T> options(GenericType<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public <T> Future<T> options(InvocationCallback<T> callback) {
+ return method(HttpMethod.OPTIONS, callback);
+ }
+
+ @Override
+ public Future<Response> trace() {
+ return trace(Response.class);
+ }
+
+ @Override
+ public <T> Future<T> trace(Class<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public <T> Future<T> trace(GenericType<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public <T> Future<T> trace(InvocationCallback<T> callback) {
+ return method("TRACE", callback);
+ }
+
+ @Override
+ public Future<Response> method(String name) {
+ return method(name, Response.class);
+ }
+
+ @Override
+ public <T> Future<T> method(String name, Class<T> responseType) {
+ return wc.doInvokeAsync(name, null, null, null, responseType, responseType, null);
+ }
+
+ @Override
+ public <T> Future<T> method(String name, GenericType<T> responseType) {
+ return wc.doInvokeAsync(name, null, null, null, responseType.getRawType(),
+ responseType.getType(), null);
+ }
+
+ @Override
+ public <T> Future<T> method(String name, InvocationCallback<T> callback) {
+ return wc.doInvokeAsyncCallback(name, null, null, null, callback);
+ }
+
+ @Override
+ public Future<Response> method(String name, Entity<?> entity) {
+ return method(name, entity, Response.class);
+ }
+
+ @Override
+ public <T> Future<T> method(String name, Entity<?> entity, Class<T> responseType) {
+ return wc.doInvokeAsync(name,
+ entity,
+ null,
+ null, responseType, responseType, null);
+ }
+
+ @Override
+ public <T> Future<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
+ return wc.doInvokeAsync(name,
+ entity,
+ null,
+ null, responseType.getRawType(), responseType.getType(), null);
+ }
+
+ @Override
+ public <T> Future<T> method(String name, Entity<?> entity, InvocationCallback<T> callback) {
+ return wc.doInvokeAsyncCallback(name,
+ entity,
+ null,
+ null,
+ callback);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProviderFactory.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProviderFactory.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProviderFactory.java
index dd2522e..f064e5d 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProviderFactory.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProviderFactory.java
@@ -25,6 +25,7 @@ import java.util.List;
import javax.ws.rs.client.ClientRequestFilter;
import javax.ws.rs.client.ClientResponseFilter;
+import javax.ws.rs.client.RxInvokerProvider;
import javax.ws.rs.core.Configuration;
import org.apache.cxf.Bus;
@@ -42,7 +43,7 @@ public final class ClientProviderFactory extends ProviderFactory {
new ArrayList<ProviderInfo<ClientResponseFilter>>(1);
private List<ProviderInfo<ResponseExceptionMapper<?>>> responseExceptionMappers =
new ArrayList<ProviderInfo<ResponseExceptionMapper<?>>>(1);
-
+ private RxInvokerProvider<?> rxInvokerProvider;
private ClientProviderFactory(Bus bus) {
super(bus);
}
@@ -85,6 +86,10 @@ public final class ClientProviderFactory extends ProviderFactory {
if (ResponseExceptionMapper.class.isAssignableFrom(providerCls)) {
addProviderToList(responseExceptionMappers, provider);
}
+
+ if (RxInvokerProvider.class.isAssignableFrom(providerCls)) {
+ this.rxInvokerProvider = RxInvokerProvider.class.cast(provider.getProvider());
+ }
}
Collections.sort(clientRequestFilters,
new BindingPriorityComparator(ClientRequestFilter.class, true));
@@ -133,5 +138,8 @@ public final class ClientProviderFactory extends ProviderFactory {
return (Configuration)m.getExchange().getOutMessage()
.getContextualProperty(Configuration.class.getName());
}
-
+
+ public RxInvokerProvider<?> getRxInvokerProvider() {
+ return rxInvokerProvider;
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
new file mode 100644
index 0000000..6071511
--- /dev/null
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.client;
+
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.CompletionStageRxInvoker;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+public class CompletionStageRxInvokerImpl implements CompletionStageRxInvoker {
+ private WebClient wc;
+ private ExecutorService ex;
+ CompletionStageRxInvokerImpl(WebClient wc, ExecutorService ex) {
+ this.ex = ex;
+ this.wc = wc;
+ }
+
+ @Override
+ public CompletionStage<Response> get() {
+ return get(Response.class);
+ }
+
+ @Override
+ public <T> CompletionStage<T> get(Class<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public <T> CompletionStage<T> get(GenericType<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public CompletionStage<Response> put(Entity<?> entity) {
+ return put(entity, Response.class);
+ }
+
+ @Override
+ public <T> CompletionStage<T> put(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public <T> CompletionStage<T> put(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public CompletionStage<Response> post(Entity<?> entity) {
+ return post(entity, Response.class);
+ }
+
+ @Override
+ public <T> CompletionStage<T> post(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public <T> CompletionStage<T> post(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public CompletionStage<Response> delete() {
+ return delete(Response.class);
+ }
+
+ @Override
+ public <T> CompletionStage<T> delete(Class<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public <T> CompletionStage<T> delete(GenericType<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public CompletionStage<Response> head() {
+ return method(HttpMethod.HEAD);
+ }
+
+ @Override
+ public CompletionStage<Response> options() {
+ return options(Response.class);
+ }
+
+ @Override
+ public <T> CompletionStage<T> options(Class<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public <T> CompletionStage<T> options(GenericType<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public CompletionStage<Response> trace() {
+ return trace(Response.class);
+ }
+
+ @Override
+ public <T> CompletionStage<T> trace(Class<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public <T> CompletionStage<T> trace(GenericType<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public CompletionStage<Response> method(String name) {
+ return method(name, Response.class);
+ }
+
+ @Override
+ public CompletionStage<Response> method(String name, Entity<?> entity) {
+ return method(name, entity, Response.class);
+ }
+
+ @Override
+ public <T> CompletionStage<T> method(String name, Entity<?> entity, Class<T> responseType) {
+ return wc.doInvokeAsyncStage(name, entity, responseType, responseType, ex);
+ }
+
+ @Override
+ public <T> CompletionStage<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
+ return wc.doInvokeAsyncStage(name, entity, responseType.getRawType(), responseType.getType(), ex);
+ }
+
+ @Override
+ public <T> CompletionStage<T> method(String name, Class<T> responseType) {
+ return wc.doInvokeAsyncStage(name, null, responseType, responseType, ex);
+ }
+
+ @Override
+ public <T> CompletionStage<T> method(String name, GenericType<T> responseType) {
+ return wc.doInvokeAsyncStage(name, null, responseType.getRawType(), responseType.getType(), ex);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
new file mode 100644
index 0000000..e3133cd
--- /dev/null
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/SyncInvokerImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.client;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+public class SyncInvokerImpl implements SyncInvoker {
+ private WebClient wc;
+ public SyncInvokerImpl(WebClient wc) {
+ this.wc = wc;
+ }
+
+ @Override
+ public Response delete() {
+ return method(HttpMethod.DELETE);
+ }
+
+ @Override
+ public <T> T delete(Class<T> cls) {
+ return method(HttpMethod.DELETE, cls);
+ }
+
+ @Override
+ public <T> T delete(GenericType<T> genericType) {
+ return method(HttpMethod.DELETE, genericType);
+ }
+
+ @Override
+ public Response get() {
+ return method(HttpMethod.GET);
+ }
+
+ @Override
+ public <T> T get(Class<T> cls) {
+ return method(HttpMethod.GET, cls);
+ }
+
+ @Override
+ public <T> T get(GenericType<T> genericType) {
+ return method(HttpMethod.GET, genericType);
+ }
+
+ @Override
+ public Response head() {
+ return method(HttpMethod.HEAD);
+ }
+
+ @Override
+ public Response options() {
+ return method(HttpMethod.OPTIONS);
+ }
+
+ @Override
+ public <T> T options(Class<T> cls) {
+ return method(HttpMethod.OPTIONS, cls);
+ }
+
+ @Override
+ public <T> T options(GenericType<T> genericType) {
+ return method(HttpMethod.OPTIONS, genericType);
+ }
+
+ @Override
+ public Response post(Entity<?> entity) {
+ return method(HttpMethod.POST, entity);
+ }
+
+ @Override
+ public <T> T post(Entity<?> entity, Class<T> cls) {
+ return method(HttpMethod.POST, entity, cls);
+ }
+
+ @Override
+ public <T> T post(Entity<?> entity, GenericType<T> genericType) {
+ return method(HttpMethod.POST, entity, genericType);
+ }
+
+ @Override
+ public Response put(Entity<?> entity) {
+ return method(HttpMethod.PUT, entity);
+ }
+
+ @Override
+ public <T> T put(Entity<?> entity, Class<T> cls) {
+ return method(HttpMethod.PUT, entity, cls);
+ }
+
+ @Override
+ public <T> T put(Entity<?> entity, GenericType<T> genericType) {
+ return method(HttpMethod.PUT, entity, genericType);
+ }
+
+ @Override
+ public Response trace() {
+ return method("TRACE");
+ }
+
+ @Override
+ public <T> T trace(Class<T> cls) {
+ return method("TRACE", cls);
+ }
+
+ @Override
+ public <T> T trace(GenericType<T> genericType) {
+ return method("TRACE", genericType);
+ }
+
+ @Override
+ public Response method(String method) {
+ return method(method, Response.class);
+ }
+
+ @Override
+ public <T> T method(String method, Class<T> cls) {
+ return wc.invoke(method, null, cls);
+ }
+
+ @Override
+ public <T> T method(String method, GenericType<T> genericType) {
+ return wc.invoke(method, null, genericType);
+ }
+
+ @Override
+ public Response method(String method, Entity<?> entity) {
+ return method(method, entity, Response.class);
+ }
+
+ @Override
+ public <T> T method(String method, Entity<?> entity, Class<T> cls) {
+ return wc.invoke(method, entity, cls);
+ }
+
+ @Override
+ public <T> T method(String method, Entity<?> entity, GenericType<T> genericType) {
+ return wc.invoke(method, entity, genericType);
+ }
+
+ public WebClient getWebClient() {
+ return wc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
index f7ed1f2..23349f3 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
@@ -1260,12 +1260,12 @@ public class WebClient extends AbstractClient implements AsyncClient {
// Link to JAX-RS 2.0 AsyncInvoker
public AsyncInvoker async() {
- return new AsyncInvokerImpl();
+ return new AsyncInvokerImpl(this);
}
// Link to JAX-RS 2.0 SyncInvoker
public SyncInvoker sync() {
- return new SyncInvokerImpl();
+ return new SyncInvokerImpl(this);
}
// Link to JAX-RS 2.1 CompletionStageRxInvoker
@@ -1273,17 +1273,28 @@ public class WebClient extends AbstractClient implements AsyncClient {
return rx((ExecutorService)null);
}
public CompletionStageRxInvoker rx(ExecutorService ex) {
- return new CompletionStageRxInvokerImpl(ex);
+ return new CompletionStageRxInvokerImpl(this, ex);
}
// Link to JAX-RS 2.1 RxInvoker extensions
@SuppressWarnings("rawtypes")
- public <T extends RxInvoker> T rx(RxInvokerProvider<T> p) {
- return rx(p, (ExecutorService)null);
+ public <T extends RxInvoker> T rx(Class<T> rxCls) {
+ return rx(rxCls, (ExecutorService)null);
}
- @SuppressWarnings("rawtypes")
- public <T extends RxInvoker> T rx(RxInvokerProvider<T> p, ExecutorService execService) {
- return p.getRxInvoker(new InvocationBuilderImpl(this), execService);
+ @SuppressWarnings({
+ "rawtypes", "unchecked"
+ })
+ public <T extends RxInvoker> T rx(Class<T> rxCls, ExecutorService executorService) {
+ if (CompletionStageRxInvoker.class.isAssignableFrom(rxCls)) {
+ return (T)rx(executorService);
+ }
+ ClientProviderFactory pf =
+ ClientProviderFactory.getInstance(WebClient.getConfig(this).getEndpoint());
+ RxInvokerProvider rxProvider = pf.getRxInvokerProvider();
+ if (rxProvider != null && rxProvider.isProviderFor(rxCls)) {
+ return (T)rxProvider.getRxInvoker(sync(), executorService);
+ }
+ throw new IllegalStateException("Provider for " + rxCls.getName() + " is not available");
}
private void setEntityHeaders(Entity<?> entity) {
@@ -1296,450 +1307,4 @@ public class WebClient extends AbstractClient implements AsyncClient {
}
}
- class AsyncInvokerImpl implements AsyncInvoker {
-
- @Override
- public Future<Response> get() {
- return get(Response.class);
- }
-
- @Override
- public <T> Future<T> get(Class<T> responseType) {
- return method(HttpMethod.GET, responseType);
- }
-
- @Override
- public <T> Future<T> get(GenericType<T> responseType) {
- return method(HttpMethod.GET, responseType);
- }
-
- @Override
- public <T> Future<T> get(InvocationCallback<T> callback) {
- return method(HttpMethod.GET, callback);
- }
-
- @Override
- public Future<Response> put(Entity<?> entity) {
- return put(entity, Response.class);
- }
-
- @Override
- public <T> Future<T> put(Entity<?> entity, Class<T> responseType) {
- return method(HttpMethod.PUT, entity, responseType);
- }
-
- @Override
- public <T> Future<T> put(Entity<?> entity, GenericType<T> responseType) {
- return method(HttpMethod.PUT, entity, responseType);
- }
-
- @Override
- public <T> Future<T> put(Entity<?> entity, InvocationCallback<T> callback) {
- return method(HttpMethod.PUT, entity, callback);
- }
-
- @Override
- public Future<Response> post(Entity<?> entity) {
- return post(entity, Response.class);
- }
-
- @Override
- public <T> Future<T> post(Entity<?> entity, Class<T> responseType) {
- return method(HttpMethod.POST, entity, responseType);
- }
-
- @Override
- public <T> Future<T> post(Entity<?> entity, GenericType<T> responseType) {
- return method(HttpMethod.POST, entity, responseType);
- }
-
- @Override
- public <T> Future<T> post(Entity<?> entity, InvocationCallback<T> callback) {
- return method(HttpMethod.POST, entity, callback);
- }
-
- @Override
- public Future<Response> delete() {
- return delete(Response.class);
- }
-
- @Override
- public <T> Future<T> delete(Class<T> responseType) {
- return method(HttpMethod.DELETE, responseType);
- }
-
- @Override
- public <T> Future<T> delete(GenericType<T> responseType) {
- return method(HttpMethod.DELETE, responseType);
- }
-
- @Override
- public <T> Future<T> delete(InvocationCallback<T> callback) {
- return method(HttpMethod.DELETE, callback);
- }
-
- @Override
- public Future<Response> head() {
- return method(HttpMethod.HEAD);
- }
-
- @Override
- public Future<Response> head(InvocationCallback<Response> callback) {
- return method(HttpMethod.HEAD, callback);
- }
-
- @Override
- public Future<Response> options() {
- return options(Response.class);
- }
-
- @Override
- public <T> Future<T> options(Class<T> responseType) {
- return method(HttpMethod.OPTIONS, responseType);
- }
-
- @Override
- public <T> Future<T> options(GenericType<T> responseType) {
- return method(HttpMethod.OPTIONS, responseType);
- }
-
- @Override
- public <T> Future<T> options(InvocationCallback<T> callback) {
- return method(HttpMethod.OPTIONS, callback);
- }
-
- @Override
- public Future<Response> trace() {
- return trace(Response.class);
- }
-
- @Override
- public <T> Future<T> trace(Class<T> responseType) {
- return method("TRACE", responseType);
- }
-
- @Override
- public <T> Future<T> trace(GenericType<T> responseType) {
- return method("TRACE", responseType);
- }
-
- @Override
- public <T> Future<T> trace(InvocationCallback<T> callback) {
- return method("TRACE", callback);
- }
-
- @Override
- public Future<Response> method(String name) {
- return method(name, Response.class);
- }
-
- @Override
- public <T> Future<T> method(String name, Class<T> responseType) {
- return doInvokeAsync(name, null, null, null, responseType, responseType, null);
- }
-
- @Override
- public <T> Future<T> method(String name, GenericType<T> responseType) {
- return doInvokeAsync(name, null, null, null, responseType.getRawType(),
- responseType.getType(), null);
- }
-
- @Override
- public <T> Future<T> method(String name, InvocationCallback<T> callback) {
- return doInvokeAsyncCallback(name, null, null, null, callback);
- }
-
- @Override
- public Future<Response> method(String name, Entity<?> entity) {
- return method(name, entity, Response.class);
- }
-
- @Override
- public <T> Future<T> method(String name, Entity<?> entity, Class<T> responseType) {
- return doInvokeAsync(name,
- entity,
- null,
- null, responseType, responseType, null);
- }
-
- @Override
- public <T> Future<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
- return doInvokeAsync(name,
- entity,
- null,
- null, responseType.getRawType(), responseType.getType(), null);
- }
-
- @Override
- public <T> Future<T> method(String name, Entity<?> entity, InvocationCallback<T> callback) {
- return doInvokeAsyncCallback(name,
- entity,
- null,
- null,
- callback);
- }
- }
-
-
-
- class SyncInvokerImpl implements SyncInvoker {
-
- @Override
- public Response delete() {
- return method(HttpMethod.DELETE);
- }
-
- @Override
- public <T> T delete(Class<T> cls) {
- return method(HttpMethod.DELETE, cls);
- }
-
- @Override
- public <T> T delete(GenericType<T> genericType) {
- return method(HttpMethod.DELETE, genericType);
- }
-
- @Override
- public Response get() {
- return method(HttpMethod.GET);
- }
-
- @Override
- public <T> T get(Class<T> cls) {
- return method(HttpMethod.GET, cls);
- }
-
- @Override
- public <T> T get(GenericType<T> genericType) {
- return method(HttpMethod.GET, genericType);
- }
-
- @Override
- public Response head() {
- return method(HttpMethod.HEAD);
- }
-
- @Override
- public Response options() {
- return method(HttpMethod.OPTIONS);
- }
-
- @Override
- public <T> T options(Class<T> cls) {
- return method(HttpMethod.OPTIONS, cls);
- }
-
- @Override
- public <T> T options(GenericType<T> genericType) {
- return method(HttpMethod.OPTIONS, genericType);
- }
-
- @Override
- public Response post(Entity<?> entity) {
- return method(HttpMethod.POST, entity);
- }
-
- @Override
- public <T> T post(Entity<?> entity, Class<T> cls) {
- return method(HttpMethod.POST, entity, cls);
- }
-
- @Override
- public <T> T post(Entity<?> entity, GenericType<T> genericType) {
- return method(HttpMethod.POST, entity, genericType);
- }
-
- @Override
- public Response put(Entity<?> entity) {
- return method(HttpMethod.PUT, entity);
- }
-
- @Override
- public <T> T put(Entity<?> entity, Class<T> cls) {
- return method(HttpMethod.PUT, entity, cls);
- }
-
- @Override
- public <T> T put(Entity<?> entity, GenericType<T> genericType) {
- return method(HttpMethod.PUT, entity, genericType);
- }
-
- @Override
- public Response trace() {
- return method("TRACE");
- }
-
- @Override
- public <T> T trace(Class<T> cls) {
- return method("TRACE", cls);
- }
-
- @Override
- public <T> T trace(GenericType<T> genericType) {
- return method("TRACE", genericType);
- }
-
- @Override
- public Response method(String method) {
- return method(method, Response.class);
- }
-
- @Override
- public <T> T method(String method, Class<T> cls) {
- return invoke(method, null, cls);
- }
-
- @Override
- public <T> T method(String method, GenericType<T> genericType) {
- return invoke(method, null, genericType);
- }
-
- @Override
- public Response method(String method, Entity<?> entity) {
- return method(method, entity, Response.class);
- }
-
- @Override
- public <T> T method(String method, Entity<?> entity, Class<T> cls) {
- return invoke(method, entity, cls);
- }
-
- @Override
- public <T> T method(String method, Entity<?> entity, GenericType<T> genericType) {
- return invoke(method, entity, genericType);
- }
- }
-
- class CompletionStageRxInvokerImpl implements CompletionStageRxInvoker {
- private ExecutorService ex;
- CompletionStageRxInvokerImpl(ExecutorService ex) {
- this.ex = ex;
- }
-
- @Override
- public CompletionStage<Response> get() {
- return get(Response.class);
- }
-
- @Override
- public <T> CompletionStage<T> get(Class<T> responseType) {
- return method(HttpMethod.GET, responseType);
- }
-
- @Override
- public <T> CompletionStage<T> get(GenericType<T> responseType) {
- return method(HttpMethod.GET, responseType);
- }
-
- @Override
- public CompletionStage<Response> put(Entity<?> entity) {
- return put(entity, Response.class);
- }
-
- @Override
- public <T> CompletionStage<T> put(Entity<?> entity, Class<T> responseType) {
- return method(HttpMethod.PUT, entity, responseType);
- }
-
- @Override
- public <T> CompletionStage<T> put(Entity<?> entity, GenericType<T> responseType) {
- return method(HttpMethod.PUT, entity, responseType);
- }
-
- @Override
- public CompletionStage<Response> post(Entity<?> entity) {
- return post(entity, Response.class);
- }
-
- @Override
- public <T> CompletionStage<T> post(Entity<?> entity, Class<T> responseType) {
- return method(HttpMethod.POST, entity, responseType);
- }
-
- @Override
- public <T> CompletionStage<T> post(Entity<?> entity, GenericType<T> responseType) {
- return method(HttpMethod.POST, entity, responseType);
- }
-
- @Override
- public CompletionStage<Response> delete() {
- return delete(Response.class);
- }
-
- @Override
- public <T> CompletionStage<T> delete(Class<T> responseType) {
- return method(HttpMethod.DELETE, responseType);
- }
-
- @Override
- public <T> CompletionStage<T> delete(GenericType<T> responseType) {
- return method(HttpMethod.DELETE, responseType);
- }
-
- @Override
- public CompletionStage<Response> head() {
- return method(HttpMethod.HEAD);
- }
-
- @Override
- public CompletionStage<Response> options() {
- return options(Response.class);
- }
-
- @Override
- public <T> CompletionStage<T> options(Class<T> responseType) {
- return method(HttpMethod.OPTIONS, responseType);
- }
-
- @Override
- public <T> CompletionStage<T> options(GenericType<T> responseType) {
- return method(HttpMethod.OPTIONS, responseType);
- }
-
- @Override
- public CompletionStage<Response> trace() {
- return trace(Response.class);
- }
-
- @Override
- public <T> CompletionStage<T> trace(Class<T> responseType) {
- return method("TRACE", responseType);
- }
-
- @Override
- public <T> CompletionStage<T> trace(GenericType<T> responseType) {
- return method("TRACE", responseType);
- }
-
- @Override
- public CompletionStage<Response> method(String name) {
- return method(name, Response.class);
- }
-
- @Override
- public CompletionStage<Response> method(String name, Entity<?> entity) {
- return method(name, entity, Response.class);
- }
-
- @Override
- public <T> CompletionStage<T> method(String name, Entity<?> entity, Class<T> responseType) {
- return doInvokeAsyncStage(name, entity, responseType, responseType, ex);
- }
-
- @Override
- public <T> CompletionStage<T> method(String name, Entity<?> entity, GenericType<T> responseType) {
- return doInvokeAsyncStage(name, entity, responseType.getRawType(), responseType.getType(), ex);
- }
-
- @Override
- public <T> CompletionStage<T> method(String name, Class<T> responseType) {
- return doInvokeAsyncStage(name, null, responseType, responseType, ex);
- }
-
- @Override
- public <T> CompletionStage<T> method(String name, GenericType<T> responseType) {
- return doInvokeAsyncStage(name, null, responseType.getRawType(), responseType.getType(), ex);
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
----------------------------------------------------------------------
diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
index ea84b46..b8c870a 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.ws.rs.HttpMethod;
-import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.AsyncInvoker;
import javax.ws.rs.client.CompletionStageRxInvoker;
import javax.ws.rs.client.Entity;
@@ -35,7 +34,6 @@ import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.NioInvoker;
import javax.ws.rs.client.RxInvoker;
-import javax.ws.rs.client.RxInvokerProvider;
import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Cookie;
@@ -384,40 +382,25 @@ public class InvocationBuilderImpl implements Invocation.Builder {
@Override
public CompletionStageRxInvoker rx(ExecutorService executorService) {
+ // TODO: At the moment we still delegate if possible to the async HTTP conduit.
+ // Investigate if letting the CompletableFuture thread pool deal with the sync invocation
+ // is indeed more effective
+
return webClient.rx(executorService);
}
@SuppressWarnings("rawtypes")
@Override
- public <T extends RxInvoker> T rx(Class<? extends RxInvokerProvider<T>> pClass) {
- return rx(pClass, (ExecutorService)null);
+ public <T extends RxInvoker> T rx(Class<T> rxCls) {
+ return rx(rxCls, (ExecutorService)null);
}
@SuppressWarnings("rawtypes")
@Override
- public <T extends RxInvoker> T rx(Class<? extends RxInvokerProvider<T>> pClass, ExecutorService execService) {
- RxInvokerProvider<T> p = null;
- try {
- p = pClass.newInstance();
- } catch (Throwable t) {
- throw new ProcessingException(t);
- }
- return rx(p, execService);
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public <T extends RxInvoker> T rx(RxInvokerProvider<T> p) {
- return rx(p, (ExecutorService)null);
+ public <T extends RxInvoker> T rx(Class<T> rxCls, ExecutorService executorService) {
+ return webClient.rx(rxCls, executorService);
}
-
- @SuppressWarnings("rawtypes")
- @Override
- public <T extends RxInvoker> T rx(RxInvokerProvider<T> p, ExecutorService execService) {
- return p.getRxInvoker(this, execService);
- }
-
@Override
public NioInvoker nio() {
http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerProvider.java
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerProvider.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerProvider.java
index 100fca2..63f60a6 100644
--- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerProvider.java
+++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerProvider.java
@@ -20,16 +20,24 @@ package org.apache.cxf.jaxrs.rx.client;
import java.util.concurrent.ExecutorService;
-import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
-import org.apache.cxf.jaxrs.client.spec.InvocationBuilderImpl;
+import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
public class ObservableRxInvokerProvider implements RxInvokerProvider<ObservableRxInvoker> {
@Override
- public ObservableRxInvoker getRxInvoker(Invocation.Builder builder, ExecutorService execService) {
- return new ObservableRxInvokerImpl(((InvocationBuilderImpl)builder).getWebClient(), execService);
+ public ObservableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
+ // TODO: At the moment we still delegate if possible to the async HTTP conduit.
+ // Investigate if letting the RxJava thread pool deal with the sync invocation
+ // is indeed more effective
+ return new ObservableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService);
+ }
+
+ @Override
+ public boolean isProviderFor(Class<?> rxCls) {
+ return ObservableRxInvoker.class == rxCls;
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/ea8b6e27/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
index ec25b17..0508d67 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
@@ -32,6 +32,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
import org.apache.cxf.jaxrs.rx.provider.ObservableReader;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
@@ -127,9 +128,10 @@ public class JAXRSReactiveTest extends AbstractBusClientServerTestBase {
@Test
public void testGetHelloWorldAsyncObservable() throws Exception {
String address = "http://localhost:" + PORT + "/reactive/textAsync";
- WebClient wc = WebClient.create(address);
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new ObservableRxInvokerProvider()));
Observable<String> obs = wc.accept("text/plain")
- .rx(new ObservableRxInvokerProvider())
+ .rx(ObservableRxInvoker.class)
.get(String.class);
obs.map(s -> {
return s + s;
@@ -142,8 +144,9 @@ public class JAXRSReactiveTest extends AbstractBusClientServerTestBase {
@Test
public void testGetHelloWorldAsyncObservable404() throws Exception {
String address = "http://localhost:" + PORT + "/reactive/textAsync404";
- Invocation.Builder b = ClientBuilder.newClient().target(address).request();
- b.rx(ObservableRxInvokerProvider.class).get(String.class).subscribe(
+ Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
+ .target(address).request();
+ b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
s -> {
fail("Exception expected");
},