You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ol...@apache.org on 2012/08/01 17:43:28 UTC
svn commit: r1368064 - in /cxf/sandbox/dkulp_async_clients/http-hc/src:
main/java/org/apache/cxf/transport/http/asyncclient/ test/resources/
Author: olegk
Date: Wed Aug 1 15:43:28 2012
New Revision: 1368064
URL: http://svn.apache.org/viewvc?rev=1368064&view=rev
Log:
Fixed data corruption caused by thread-safety issues
Added:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java (with props)
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
cxf/sandbox/dkulp_async_clients/http-hc/src/test/resources/log4j.properties
Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1368064&r1=1368063&r2=1368064&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Wed Aug 1 15:43:28 2012
@@ -19,15 +19,11 @@
package org.apache.cxf.transport.http.asyncclient;
-import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -45,22 +41,15 @@ import org.apache.cxf.transports.http.co
import org.apache.cxf.version.Version;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.http.Header;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpException;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.entity.ContentType;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.IOControl;
-import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer;
-import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
+import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.protocol.BasicHttpContext;
-import org.apache.http.protocol.HttpContext;
/**
*
@@ -119,8 +108,10 @@ public class AsyncHTTPConduit extends HT
class AsyncWrappedOutputStream extends WrappedOutputStream {
- CXFHttpRequest entity;
- BasicHttpEntity basicEntity;
+ final CXFHttpRequest entity;
+ final BasicHttpEntity basicEntity;
+ final SharedInputBuffer inbuf;
+ final SharedOutputBuffer outbuf;
boolean isAsync;
// Objects for the response
@@ -143,7 +134,11 @@ public class AsyncHTTPConduit extends HT
url);
entity = message.get(CXFHttpRequest.class);
basicEntity = (BasicHttpEntity)entity.getEntity();
+ HeapByteBufferAllocator allocator = new HeapByteBufferAllocator();
+ inbuf = new SharedInputBuffer(4096, allocator);
+ outbuf = new SharedOutputBuffer(4096, allocator);
}
+
protected void setProtocolHeaders() throws IOException {
Headers h = new Headers(outMessage);
basicEntity.setContentType(h.determineContentType());
@@ -181,79 +176,77 @@ public class AsyncHTTPConduit extends HT
basicEntity.setChunked(true);
}
- synchronized void waitForEncoder() throws IOException {
- while (encoder == null) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
- }
- synchronized void setEncoder(ContentEncoder enc, IOControl ioc) {
- encoder = enc;
- requestioctrl = ioc;
- notifyAll();
- }
-
protected void setupWrappedStream() throws IOException {
- HttpAsyncResponseConsumer<Object> consumer = new CXFHttpAsyncResponseConsumer();
- FutureCallback<Object> callback = new FutureCallback<Object>() {
- public void completed(Object result) {
+ CXFResponseCallback responseCallback = new CXFResponseCallback() {
+
+ @Override
+ public void responseReceived(HttpResponse response) {
+ setHttpResponse(response);
}
+
+ };
+
+ HttpAsyncResponseConsumer<Boolean> consumer = new CXFHttpAsyncResponseConsumer(inbuf,
+ responseCallback);
+
+ FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
+
+ public void completed(Boolean result) {
+ }
+
public void failed(Exception ex) {
+ inbuf.shutdown();
+ outbuf.shutdown();
}
public void cancelled() {
+ inbuf.shutdown();
+ outbuf.shutdown();
}
};
- factory.getRequester()
- .execute(new CXFHttpAsyncRequestProducer(entity),
+ factory.getRequester().execute(new CXFHttpAsyncRequestProducer(entity, outbuf),
consumer,
factory.getPool(),
new BasicHttpContext(),
callback);
+
wrappedStream = new OutputStream() {
+
public void write(byte b[], int off, int len) throws IOException {
- waitForEncoder();
- if (len == 0) {
- return;
- }
- ByteBuffer bb = ByteBuffer.wrap(b, off, len);
- while (bb.hasRemaining()) {
- int i = encoder.write(bb);
- if (i == -1) {
- return;
- }
- }
+ outbuf.write(b, off, len);
}
public void write(int b) throws IOException {
- write(new byte[] {(byte)b});
+ outbuf.write(b);
}
public void close() throws IOException {
- waitForEncoder();
- requestioctrl.requestInput();
- requestioctrl.requestOutput();
- encoder.complete();
+ outbuf.writeCompleted();
}
};
// If we need to cache for retransmission, store data in a
// CacheAndWriteOutputStream. Otherwise write directly to the output stream.
if (cachingForRetransmission) {
- cachedStream =
- new CacheAndWriteOutputStream(wrappedStream);
+ cachedStream = new CacheAndWriteOutputStream(wrappedStream);
wrappedStream = cachedStream;
- } else {
- wrappedStream = new BufferedOutputStream(wrappedStream, 8192);
}
}
+
protected synchronized void setHttpResponse(HttpResponse r) {
httpResponse = r;
+ if (isAsync) {
+ //got a response, need to start the response processing now
+ try {
+ handleResponseOnWorkqueue(false);
+ isAsync = false; // don't trigger another start on next block. :-)
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
notifyAll();
}
+
protected synchronized HttpResponse getHttpResponse() throws IOException {
while (httpResponse == null) {
//FIXME get the read timeout
@@ -269,66 +262,53 @@ public class AsyncHTTPConduit extends HT
protected void handleResponseAsync() throws IOException {
isAsync = true;
}
+
protected void closeInputStream() throws IOException {
}
- protected synchronized void setDecoder(ContentDecoder r, IOControl i) {
- decoder = r;
- ioctrl = i;
- if (isAsync) {
- //got a response, need to start the response processing now
- try {
- handleResponseOnWorkqueue(false);
- isAsync = false; // don't trigger another start on next block. :-)
- } catch (Exception ex) {
- ex.printStackTrace();
+ protected synchronized InputStream getInputStream() throws IOException {
+ return new InputStream() {
+
+ @Override
+ public int read() throws IOException {
+ return inbuf.read();
}
- }
- notifyAll();
- }
- synchronized void waitForDecoder() throws IOException {
- while (decoder == null) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new IOException();
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return inbuf.read(b);
}
- }
- }
- protected synchronized InputStream getInputStream() throws IOException {
- return Channels.newInputStream(new ReadableByteChannel() {
- public boolean isOpen() {
- try {
- waitForDecoder();
- } catch (IOException e) {
- return false;
- }
- return !decoder.isCompleted();
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return inbuf.read(b, off, len);
}
- public void close() throws IOException {
- waitForDecoder();
- ioctrl.requestInput();
+
+ @Override
+ public int available() throws IOException {
+ return inbuf.available();
}
- public int read(ByteBuffer dst) throws IOException {
- waitForDecoder();
- int i = 0;
- while (i == 0) {
- //really should wait for an async event
- i = decoder.read(dst);
- }
- return i;
+
+ @Override
+ public void close() throws IOException {
+ inbuf.close();
}
- });
+
+ };
}
+
protected boolean usingProxy() {
return false;
}
+
protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException {
return null;
}
+
protected synchronized int getResponseCode() throws IOException {
return getHttpResponse().getStatusLine().getStatusCode();
}
+
protected String getResponseMessage() throws IOException {
return getHttpResponse().getStatusLine().getReasonPhrase();
}
@@ -342,87 +322,29 @@ public class AsyncHTTPConduit extends HT
h.headerMap().put(header.getName(), s);
}
}
+
protected void updateResponseHeaders(Message inMessage) {
Headers h = new Headers(inMessage);
readHeaders(h);
}
+
protected InputStream getPartialResponse() throws IOException {
return null;
}
+
protected void updateCookiesBeforeRetransmit() {
Headers h = new Headers();
readHeaders(h);
cookies.readFromHeaders(h);
}
+
protected void retransmitStream() throws IOException {
}
+
protected void setupNewConnection(String newURL) throws IOException {
httpResponse = null;
-
- }
-
-
- class CXFHttpAsyncResponseConsumer extends AbstractAsyncResponseConsumer<Object> {
- protected void onResponseReceived(HttpResponse response) throws HttpException, IOException {
- setHttpResponse(response);
- }
- protected Object buildResult(HttpContext context) throws Exception {
- return Boolean.TRUE;
- }
- protected void onContentReceived(ContentDecoder dec, IOControl ioc)
- throws IOException {
- setDecoder(dec, ioc);
- if (!dec.isCompleted()) {
- ioctrl.suspendInput();
- }
- }
- protected void onEntityEnclosed(HttpEntity e, ContentType contentType) throws IOException {
- //nothing
- }
- protected void releaseResources() {
- //decoder = null;
- //ioctrl = null;
- }
}
- class CXFHttpAsyncRequestProducer implements HttpAsyncRequestProducer {
- CXFHttpRequest entity;
- public CXFHttpAsyncRequestProducer(CXFHttpRequest e) {
- entity = e;
- }
- public void close() throws IOException {
- }
- public HttpHost getTarget() {
- int i = entity.getURI().getPort();
- if (i == -1) {
- i = 80;
- }
- HttpHost host = new HttpHost(entity.getURI().getHost(),
- i,
- entity.getURI().getScheme());
- return host;
- }
- public HttpRequest generateRequest() throws IOException, HttpException {
- return entity;
- }
- public void produceContent(ContentEncoder enc, IOControl ioc) throws IOException {
- setEncoder(enc, ioc);
- if (!enc.isCompleted()) {
- ioc.suspendOutput();
- }
- }
- public void requestCompleted(HttpContext context) {
- }
- public void failed(Exception ex) {
- ex.printStackTrace();
- }
- public boolean isRepeatable() {
- return false;
- }
- public void resetRequest() throws IOException {
- }
- }
-
}
}
Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java?rev=1368064&r1=1368063&r2=1368064&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java Wed Aug 1 15:43:28 2012
@@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
-import org.apache.http.concurrent.BasicFuture;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
@@ -32,15 +31,16 @@ import org.apache.http.protocol.HttpCont
public class CXFHttpAsyncResponseConsumer implements HttpAsyncResponseConsumer<Boolean> {
private final SharedInputBuffer buf;
- private final BasicFuture<HttpResponse> future;
+ private final CXFResponseCallback responseCallback;
private volatile boolean completed;
private volatile Exception exception;
- public CXFHttpAsyncResponseConsumer(final SharedInputBuffer buf, final BasicFuture<HttpResponse> future) {
+ public CXFHttpAsyncResponseConsumer(
+ final SharedInputBuffer buf, final CXFResponseCallback responseCallback) {
super();
this.buf = buf;
- this.future = future;
+ this.responseCallback = responseCallback;
}
@Override
@@ -57,7 +57,7 @@ public class CXFHttpAsyncResponseConsume
@Override
public void responseReceived(final HttpResponse response) throws IOException, HttpException {
- future.completed(response);
+ responseCallback.responseReceived(response);
}
@Override
@@ -76,7 +76,6 @@ public class CXFHttpAsyncResponseConsume
completed = true;
exception = ex;
buf.shutdown();
- future.failed(ex);
}
@Override
Added: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java?rev=1368064&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java (added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java Wed Aug 1 15:43:28 2012
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import org.apache.http.HttpResponse;
+
+interface CXFResponseCallback {
+
+ void responseReceived(HttpResponse response);
+
+}
Propchange: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/test/resources/log4j.properties?rev=1368064&r1=1368063&r2=1368064&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/test/resources/log4j.properties (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/test/resources/log4j.properties Wed Aug 1 15:43:28 2012
@@ -4,5 +4,5 @@ log4j.appender.stdout.layout=org.apache.
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
log4j.rootLogger=WARN, stdout
-log4j.logger.org.apache.http=WARN
-
\ No newline at end of file
+#log4j.logger.org.apache.http=DEBUG
+