You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ni...@apache.org on 2013/08/15 16:32:02 UTC
svn commit: r1514298 - in /cxf/trunk/rt/transports/http-netty/netty-client:
./ src/main/java/org/apache/cxf/transport/http/netty/client/
src/test/java/org/apache/cxf/transport/http/netty/client/integration/
Author: ningjiang
Date: Thu Aug 15 14:32:02 2013
New Revision: 1514298
URL: http://svn.apache.org/r1514298
Log:
CXF-5177 Upgraded netty version of netty-client to 4.0.x
Modified:
cxf/trunk/rt/transports/http-netty/netty-client/pom.xml
cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java
cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java
cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientRequest.java
cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitFactory.java
cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/NettyClientTest.java
cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/SSLNettyClientTest.java
Modified: cxf/trunk/rt/transports/http-netty/netty-client/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/pom.xml?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/pom.xml (original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/pom.xml Thu Aug 15 14:32:02 2013
@@ -35,8 +35,8 @@
</parent>
<properties>
- <cxf.osgi.import>
- org.jboss.netty.*;version="${cxf.netty.version.range}",
+ <cxf.osgi.import>
+ io.netty.*;version="${cxf.netty.version.range}",
javax.annotation;version="${cxf.osgi.javax.annotation.version}",
</cxf.osgi.import>
<cxf.osgi.export>
@@ -53,8 +53,8 @@
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.6.5.Final</version>
+ <artifactId>netty-codec-http</artifactId>
+ <version>${cxf.netty.version}</version>
</dependency>
<dependency>
Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java (original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/CxfResponseCallBack.java Thu Aug 15 14:32:02 2013
@@ -19,7 +19,7 @@
package org.apache.cxf.transport.http.netty.client;
-import org.jboss.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponse;
public interface CxfResponseCallBack {
Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java (original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientHandler.java Thu Aug 15 14:32:02 2013
@@ -19,51 +19,60 @@
package org.apache.cxf.transport.http.netty.client;
-
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-
-public class NettyHttpClientHandler extends SimpleChannelHandler {
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.HttpResponse;
+
+public class NettyHttpClientHandler extends ChannelDuplexHandler {
+ private final BlockingQueue<NettyHttpClientRequest> sendedQueue =
+ new LinkedBlockingDeque<NettyHttpClientRequest>();
+
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- HttpResponse response = (HttpResponse)e.getMessage();
- Channel ch = ctx.getChannel();
- @SuppressWarnings("unchecked")
- BlockingQueue<NettyHttpClientRequest> sendedQueue = (BlockingQueue<NettyHttpClientRequest>)ch.getAttachment();
- NettyHttpClientRequest request = sendedQueue.poll();
- request.setResponse(response);
- // calling the callback here
- request.getCxfResponseCallback().responseReceived(response);
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+
+ if (msg instanceof HttpResponse) {
+ // just make sure we can combine the request and response together
+ HttpResponse response = (HttpResponse)msg;
+ NettyHttpClientRequest request = sendedQueue.poll();
+ request.setResponse(response);
+ // calling the callback here
+ request.getCxfResponseCallback().responseReceived(response);
+ } else {
+ super.channelRead(ctx, msg);
+ }
}
@Override
- public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- Object p = e.getMessage();
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+
// need to deal with the request
- if (p instanceof NettyHttpClientRequest) {
- NettyHttpClientRequest request = (NettyHttpClientRequest)e.getMessage();
- Channel ch = ctx.getChannel();
- @SuppressWarnings("unchecked")
- BlockingQueue<NettyHttpClientRequest> sendedQueue =
- (BlockingQueue<NettyHttpClientRequest>)ch.getAttachment();
- if (sendedQueue == null) {
- sendedQueue = new LinkedBlockingDeque<NettyHttpClientRequest>();
- ch.setAttachment(sendedQueue);
- }
+ if (msg instanceof NettyHttpClientRequest) {
+ NettyHttpClientRequest request = (NettyHttpClientRequest)msg;
sendedQueue.put(request);
- ctx.getChannel().write(request.getRequest());
+ ctx.writeAndFlush(request.getRequest());
} else {
- super.writeRequested(ctx, e);
+ super.write(ctx, msg, promise);
}
}
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ //TODO need to handle the exception here
+ cause.printStackTrace();
+ ctx.close();
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ ctx.flush();
+ }
+
+
}
Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java (original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java Thu Aug 15 14:32:02 2013
@@ -25,31 +25,31 @@ import java.util.logging.Logger;
import javax.net.ssl.SSLEngine;
import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.configuration.jsse.TLSClientParameters;
import org.apache.cxf.transport.https.SSLUtils;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-public class NettyHttpClientPipelineFactory implements ChannelPipelineFactory {
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequestEncoder;
+import io.netty.handler.codec.http.HttpResponseDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+
+public class NettyHttpClientPipelineFactory extends ChannelInitializer<Channel> {
private static final Logger LOG =
LogUtils.getL7dLogger(NettyHttpClientPipelineFactory.class);
+ private final NettyHttpConduit httpConduit;
- private final TLSClientParameters tlsClientParameters;
-
- public NettyHttpClientPipelineFactory(TLSClientParameters tlsClientParameters) {
- this.tlsClientParameters = tlsClientParameters;
+ public NettyHttpClientPipelineFactory(NettyHttpConduit httpConduit) {
+ this.httpConduit = httpConduit;
}
@Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
+ protected void initChannel(Channel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
SslHandler sslHandler = configureClientSSLOnDemand();
if (sslHandler != null) {
@@ -59,18 +59,18 @@ public class NettyHttpClientPipelineFact
pipeline.addLast("ssl", sslHandler);
}
+
pipeline.addLast("decoder", new HttpResponseDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
+ // TODO need to configure the aggregator size
+ pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("client", new NettyHttpClientHandler());
- return pipeline;
-
}
private SslHandler configureClientSSLOnDemand() throws Exception {
- if (tlsClientParameters != null) {
- SSLEngine sslEngine = SSLUtils.createClientSSLEngine(tlsClientParameters);
+ if (httpConduit.getTlsClientParameters() != null) {
+ SSLEngine sslEngine = SSLUtils.createClientSSLEngine(httpConduit.getTlsClientParameters());
return new SslHandler(sslEngine);
} else {
return null;
Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientRequest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientRequest.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientRequest.java (original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientRequest.java Thu Aug 15 14:32:02 2013
@@ -20,39 +20,43 @@
package org.apache.cxf.transport.http.netty.client;
import java.net.URI;
-import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpVersion;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpVersion;
public class NettyHttpClientRequest {
private HttpRequest request;
private HttpResponse response;
private URI uri;
+ private String method;
private CxfResponseCallBack cxfResponseCallback;
private int connectionTimeout;
private int receiveTimeout;
public NettyHttpClientRequest(URI requestUri, String method) {
this.uri = requestUri;
+ this.method = method;
+ }
+
+ public void createRequest(ByteBuf content) {
this.request =
- new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.getPath().toString());
+ new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+ HttpMethod.valueOf(method),
+ uri.getPath().toString(), content);
// setup the default headers
- request.setHeader("Connection", "keep-alive");
- request.setHeader("Host", uri.getHost() + ":" + uri.getPort());
-
+ request.headers().set("Connection", "keep-alive");
+ request.headers().set("Host", uri.getHost() + ":" + uri.getPort());
}
public HttpRequest getRequest() {
return request;
}
-
- public void setRequest(HttpRequest request) {
- this.request = request;
- }
-
+
public HttpResponse getResponse() {
return response;
}
Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java (original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java Thu Aug 15 14:32:02 2013
@@ -40,6 +40,7 @@ import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSession;
import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.helpers.HttpHeaderHelper;
import org.apache.cxf.io.CacheAndWriteOutputStream;
@@ -53,43 +54,52 @@ import org.apache.cxf.transport.https.Ht
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.cxf.version.Version;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.ssl.SslHandler;
-public class NettyHttpConduit extends URLConnectionHTTPConduit {
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.ssl.SslHandler;
+
+public class NettyHttpConduit extends URLConnectionHTTPConduit implements BusLifeCycleListener {
public static final String USE_ASYNC = "use.async.http.conduit";
final NettyHttpConduitFactory factory;
- private final ClientBootstrap bootstrap;
+ private Bootstrap bootstrap;
+ // TODO do we need to use the EventLoop from NettyHttpConduitFactory
+ private final EventLoopGroup group;
public NettyHttpConduit(Bus b, EndpointInfo ei, EndpointReferenceType t, NettyHttpConduitFactory conduitFactory)
throws IOException {
super(b, ei, t);
factory = conduitFactory;
- bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());
+ group = new NioEventLoopGroup();
+ bootstrap = new Bootstrap();
+ bootstrap.group(group);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.handler(new NettyHttpClientPipelineFactory(this));
}
-
+
public NettyHttpConduitFactory getNettyHttpConduitFactory() {
return factory;
}
public void close() {
super.close();
- // clean up the resource that ClientChannelFactory used
- bootstrap.shutdown();
+ // clean up thread of the group
+ group.shutdownGracefully().syncUninterruptibly();
}
// Using Netty API directly
protected void setupConnection(Message message, URI uri, HTTPClientPolicy csPolicy) throws IOException {
-
// need to do some clean up work on the URI address
String uriString = uri.toString();
if (uriString.startsWith("netty://")) {
@@ -122,12 +132,9 @@ public class NettyHttpConduit extends UR
final int rtimeout = determineReceiveTimeout(message, csPolicy);
request.setConnectionTimeout(ctimeout);
request.setReceiveTimeout(rtimeout);
- request.getRequest().setChunked(true);
- request.getRequest().setHeader(Message.CONTENT_TYPE, (String)message.get(Message.CONTENT_TYPE));
- // need to socket connection timeout
-
+
message.put(NettyHttpClientRequest.class, request);
- bootstrap.setPipelineFactory(new NettyHttpClientPipelineFactory(getTlsClientParameters()));
+
}
protected OutputStream createOutputStream(Message message,
@@ -142,7 +149,10 @@ public class NettyHttpConduit extends UR
chunkThreshold,
getConduitName(),
entity.getUri());
- entity.getRequest().setContent(out.getOutBuffer());
+ entity.createRequest(out.getOutBuffer());
+ // TODO need to check how to set the Chunked feature
+ //request.getRequest().setChunked(true);
+ entity.getRequest().headers().set(Message.CONTENT_TYPE, (String)message.get(Message.CONTENT_TYPE));
return out;
@@ -156,7 +166,7 @@ public class NettyHttpConduit extends UR
volatile Channel channel;
volatile SSLSession session;
boolean isAsync;
- ChannelBuffer outBuffer;
+ ByteBuf outBuffer;
OutputStream outputStream;
protected NettyWrappedOutputStream(Message message, boolean possibleRetransmit,
@@ -165,11 +175,11 @@ public class NettyHttpConduit extends UR
csPolicy = getClient(message);
entity = message.get(NettyHttpClientRequest.class);
int bufSize = csPolicy.getChunkLength() > 0 ? csPolicy.getChunkLength() : 16320;
- outBuffer = ChannelBuffers.dynamicBuffer(bufSize);
- outputStream = new ChannelBufferOutputStream(outBuffer);
+ outBuffer = Unpooled.buffer(bufSize);
+ outputStream = new ByteBufOutputStream(outBuffer);
}
- protected ChannelBuffer getOutBuffer() {
+ protected ByteBuf getOutBuffer() {
return outBuffer;
}
@@ -200,6 +210,10 @@ public class NettyHttpConduit extends UR
}
return httpResponse;
}
+
+ protected HttpContent getHttpResponseContent() throws IOException {
+ return (HttpContent) getHttpResponse();
+ }
protected synchronized Channel getChannel() throws IOException {
@@ -248,7 +262,7 @@ public class NettyHttpConduit extends UR
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
- setException(future.getCause());
+ setException(future.cause());
}
}
};
@@ -277,13 +291,13 @@ public class NettyHttpConduit extends UR
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
- setChannel(future.getChannel());
- SslHandler sslHandler = channel.getPipeline().get(SslHandler.class);
+ setChannel(future.channel());
+ SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
if (sslHandler != null) {
- session = sslHandler.getEngine().getSession();
+ session = sslHandler.engine().getSession();
}
} else {
- setException((Exception) future.getCause());
+ setException((Exception) future.cause());
}
}
};
@@ -291,9 +305,9 @@ public class NettyHttpConduit extends UR
connFuture.addListener(listener);
if (!output) {
- entity.getRequest().removeHeader("Transfer-Encoding");
- entity.getRequest().removeHeader("Content-Type");
- entity.getRequest().setContent(null);
+ entity.getRequest().headers().remove("Transfer-Encoding");
+ entity.getRequest().headers().remove("Content-Type");
+ entity.getRequest().headers().remove(null);
}
// setup the CxfResponseCallBack
@@ -347,7 +361,7 @@ public class NettyHttpConduit extends UR
@Override
protected void setProtocolHeaders() throws IOException {
Headers h = new Headers(outMessage);
- entity.getRequest().setHeader(Message.CONTENT_TYPE, h.determineContentType());
+ entity.getRequest().headers().set(Message.CONTENT_TYPE, h.determineContentType());
boolean addHeaders = MessageUtils.isTrue(outMessage.getContextualProperty(Headers.ADD_HEADERS_PROPERTY));
for (Map.Entry<String, List<String>> header : h.headerMap().entrySet()) {
@@ -356,7 +370,7 @@ public class NettyHttpConduit extends UR
}
if (addHeaders || HttpHeaderHelper.COOKIE.equalsIgnoreCase(header.getKey())) {
for (String s : header.getValue()) {
- entity.getRequest().addHeader(HttpHeaderHelper.COOKIE, s);
+ entity.getRequest().headers().add(HttpHeaderHelper.COOKIE, s);
}
} else if (!"Content-Length".equalsIgnoreCase(header.getKey())) {
StringBuilder b = new StringBuilder();
@@ -366,10 +380,10 @@ public class NettyHttpConduit extends UR
b.append(',');
}
}
- entity.getRequest().setHeader(header.getKey(), b.toString());
+ entity.getRequest().headers().set(header.getKey(), b.toString());
}
- if (!entity.getRequest().containsHeader("User-Agent")) {
- entity.getRequest().setHeader("User-Agent", Version.getCompleteVersionString());
+ if (!entity.getRequest().headers().contains("User-Agent")) {
+ entity.getRequest().headers().set("User-Agent", Version.getCompleteVersionString());
}
}
}
@@ -377,18 +391,19 @@ public class NettyHttpConduit extends UR
@Override
protected void setFixedLengthStreamingMode(int i) {
// Here we can set the Content-Length
- entity.getRequest().setHeader("Content-Length", i);
- entity.getRequest().setChunked(false);
+ entity.getRequest().headers().set("Content-Length", i);
+ // TODO we need to deal with the Chunked information ourself
+ //entity.getRequest().setChunked(false);
}
@Override
protected int getResponseCode() throws IOException {
- return getHttpResponse().getStatus().getCode();
+ return getHttpResponse().getStatus().code();
}
@Override
protected String getResponseMessage() throws IOException {
- return getHttpResponse().getStatus().getReasonPhrase();
+ return getHttpResponse().getStatus().reasonPhrase();
}
@Override
@@ -399,13 +414,13 @@ public class NettyHttpConduit extends UR
}
private String readHeaders(Headers h) throws IOException {
- Set<String> headerNames = getHttpResponse().getHeaderNames();
+ Set<String> headerNames = getHttpResponse().headers().names();
String ct = null;
for (String name : headerNames) {
- List<String> s = getHttpResponse().getHeaders(name);
+ List<String> s = getHttpResponse().headers().getAll(name);
h.headerMap().put(name, s);
if (Message.CONTENT_TYPE.equalsIgnoreCase(name)) {
- ct = getHttpResponse().getHeader(name);
+ ct = getHttpResponse().headers().get(name);
}
}
return ct;
@@ -419,7 +434,7 @@ public class NettyHttpConduit extends UR
@Override
protected void closeInputStream() throws IOException {
//We just clear the buffer
- getHttpResponse().getContent().clear();
+ getHttpResponseContent().content().clear();
}
@Override
@@ -430,7 +445,7 @@ public class NettyHttpConduit extends UR
@Override
protected InputStream getInputStream() throws IOException {
- return new ChannelBufferInputStream(getHttpResponse().getContent());
+ return new ByteBufInputStream(getHttpResponseContent().content());
}
@Override
@@ -440,14 +455,14 @@ public class NettyHttpConduit extends UR
if (responseCode == HttpURLConnection.HTTP_ACCEPTED
|| responseCode == HttpURLConnection.HTTP_OK) {
- String head = httpResponse.getHeader(HttpHeaderHelper.CONTENT_LENGTH);
+ String head = httpResponse.headers().get(HttpHeaderHelper.CONTENT_LENGTH);
int cli = 0;
if (head != null) {
cli = Integer.parseInt(head);
}
- head = httpResponse.getHeader(HttpHeaderHelper.TRANSFER_ENCODING);
+ head = httpResponse.headers().get(HttpHeaderHelper.TRANSFER_ENCODING);
boolean isChunked = head != null && HttpHeaderHelper.CHUNKED.equalsIgnoreCase(head);
- head = httpResponse.getHeader(HttpHeaderHelper.CONNECTION);
+ head = httpResponse.headers().get(HttpHeaderHelper.CONNECTION);
boolean isEofTerminated = head != null && HttpHeaderHelper.CLOSE.equalsIgnoreCase(head);
if (cli > 0) {
in = getInputStream();
@@ -486,7 +501,7 @@ public class NettyHttpConduit extends UR
entity = outMessage.get(NettyHttpClientRequest.class);
//reset the buffers
outBuffer.clear();
- outputStream = new ChannelBufferOutputStream(outBuffer);
+ outputStream = new ByteBufOutputStream(outBuffer);
} catch (URISyntaxException e) {
throw new IOException(e);
@@ -511,7 +526,8 @@ public class NettyHttpConduit extends UR
@Override
public void thresholdReached() throws IOException {
- entity.getRequest().setChunked(true);
+ //TODO need to support the chunked version
+ //entity.getRequest().setChunked(true);
}
protected synchronized void setHttpResponse(HttpResponse r) {
@@ -548,5 +564,24 @@ public class NettyHttpConduit extends UR
}
}
+ @Override
+ public void initComplete() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void postShutdown() {
+ // shutdown the conduit
+ this.close();
+
+ }
+
+ @Override
+ public void preShutdown() {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitFactory.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitFactory.java (original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduitFactory.java Thu Aug 15 14:32:02 2013
@@ -66,7 +66,8 @@ public class NettyHttpConduitFactory imp
@Override
public void postShutdown() {
- // TODO Do we need to keep the track of the NettyHttpConduit?
+ // shutdown the group
+
}
public boolean isShutdown() {
Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/NettyClientTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/NettyClientTest.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/NettyClientTest.java (original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/NettyClientTest.java Thu Aug 15 14:32:02 2013
@@ -86,7 +86,6 @@ public class NettyClientTest extends Abs
JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
factory.setServiceClass(Greeter.class);
factory.setAddress(address);
- //factory.setTransportId("http://cxf.apache.org/transports/http/netty/client");
Greeter greeter = factory.create(Greeter.class);
String response = greeter.greetMe("test");
assertEquals("Get a wrong response", "Hello test", response);
Modified: cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/SSLNettyClientTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/SSLNettyClientTest.java?rev=1514298&r1=1514297&r2=1514298&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/SSLNettyClientTest.java (original)
+++ cxf/trunk/rt/transports/http-netty/netty-client/src/test/java/org/apache/cxf/transport/http/netty/client/integration/SSLNettyClientTest.java Thu Aug 15 14:32:02 2013
@@ -26,12 +26,15 @@ import java.security.GeneralSecurityExce
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
+import javax.xml.ws.AsyncHandler;
import javax.xml.ws.Endpoint;
+import javax.xml.ws.Response;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
@@ -41,6 +44,8 @@ import org.apache.cxf.testutil.common.Ab
import org.apache.cxf.transport.http.netty.client.NettyHttpConduit;
import org.apache.hello_world_soap_http.Greeter;
import org.apache.hello_world_soap_http.SOAPService;
+import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
+import org.apache.hello_world_soap_http.types.GreetMeResponse;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -95,6 +100,25 @@ public class SSLNettyClientTest extends
setAddress(g, address);
String response = g.greetMe("test");
assertEquals("Get a wrong response", "Hello test", response);
+
+ GreetMeResponse resp = (GreetMeResponse)g.greetMeAsync("asyncTest", new AsyncHandler<GreetMeResponse>() {
+ public void handleResponse(Response<GreetMeResponse> res) {
+ try {
+ res.get().getResponseType();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ }).get();
+ assertEquals("Hello asyncTest", resp.getResponseType());
+
+ MyLaterResponseHandler handler = new MyLaterResponseHandler();
+ g.greetMeLaterAsync(1000, handler).get();
+ // need to check the result here
+ assertEquals("Hello, finally!", handler.getResponse().getResponseType());
+
}
private static void setupTLS(Greeter port)
@@ -138,5 +162,24 @@ public class SSLNettyClientTest extends
return fac.getKeyManagers();
}
+ private class MyLaterResponseHandler implements AsyncHandler<GreetMeLaterResponse> {
+ GreetMeLaterResponse response;
+ @Override
+ public void handleResponse(Response<GreetMeLaterResponse> res) {
+ try {
+ response = res.get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+
+ GreetMeLaterResponse getResponse() {
+ return response;
+ }
+
+ }
+
}