You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ol...@apache.org on 2012/08/01 17:43:28 UTC

svn commit: r1368064 - in /cxf/sandbox/dkulp_async_clients/http-hc/src: main/java/org/apache/cxf/transport/http/asyncclient/ test/resources/

Author: olegk
Date: Wed Aug  1 15:43:28 2012
New Revision: 1368064

URL: http://svn.apache.org/viewvc?rev=1368064&view=rev
Log:
Fixed data corruption caused by thread-safety issues

Added:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java   (with props)
Modified:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/resources/log4j.properties

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1368064&r1=1368063&r2=1368064&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Wed Aug  1 15:43:28 2012
@@ -19,15 +19,11 @@
 
 package org.apache.cxf.transport.http.asyncclient;
 
-import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -45,22 +41,15 @@ import org.apache.cxf.transports.http.co
 import org.apache.cxf.version.Version;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.http.Header;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpException;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpRequest;
 import org.apache.http.HttpResponse;
 import org.apache.http.concurrent.FutureCallback;
 import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.entity.ContentType;
 import org.apache.http.nio.ContentDecoder;
 import org.apache.http.nio.ContentEncoder;
 import org.apache.http.nio.IOControl;
-import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer;
-import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
+import org.apache.http.nio.util.HeapByteBufferAllocator;
 import org.apache.http.protocol.BasicHttpContext;
-import org.apache.http.protocol.HttpContext;
 
 /**
  * 
@@ -119,8 +108,10 @@ public class AsyncHTTPConduit extends HT
     
     
     class AsyncWrappedOutputStream extends WrappedOutputStream {
-        CXFHttpRequest entity;
-        BasicHttpEntity basicEntity; 
+        final CXFHttpRequest entity;
+        final BasicHttpEntity basicEntity;
+        final SharedInputBuffer inbuf;
+        final SharedOutputBuffer outbuf;
         boolean isAsync;
         
         // Objects for the response
@@ -143,7 +134,11 @@ public class AsyncHTTPConduit extends HT
                   url);
             entity = message.get(CXFHttpRequest.class);
             basicEntity = (BasicHttpEntity)entity.getEntity();
+            HeapByteBufferAllocator allocator = new HeapByteBufferAllocator();
+            inbuf = new SharedInputBuffer(4096, allocator);
+            outbuf = new SharedOutputBuffer(4096, allocator);
         }
+        
         protected void setProtocolHeaders() throws IOException {
             Headers h = new Headers(outMessage);
             basicEntity.setContentType(h.determineContentType());
@@ -181,79 +176,77 @@ public class AsyncHTTPConduit extends HT
             basicEntity.setChunked(true);
         }
 
-        synchronized void waitForEncoder() throws IOException {
-            while (encoder == null) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    throw new IOException(e);
-                }
-            }
-        }
-        synchronized void setEncoder(ContentEncoder enc, IOControl ioc) {
-            encoder = enc;
-            requestioctrl = ioc;
-            notifyAll();
-        }
-        
         protected void setupWrappedStream() throws IOException {
                        
-            HttpAsyncResponseConsumer<Object> consumer = new CXFHttpAsyncResponseConsumer();
-            FutureCallback<Object> callback = new FutureCallback<Object>() {
-                public void completed(Object result) {
+            CXFResponseCallback responseCallback = new CXFResponseCallback() {
+
+                @Override
+                public void responseReceived(HttpResponse response) {
+                    setHttpResponse(response);
                 }
+                
+            };
+            
+            HttpAsyncResponseConsumer<Boolean> consumer = new CXFHttpAsyncResponseConsumer(inbuf, 
+                    responseCallback);
+            
+            FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
+
+                public void completed(Boolean result) {
+                }
+                
                 public void failed(Exception ex) {
+                    inbuf.shutdown();
+                    outbuf.shutdown();
                 }
                 public void cancelled() {
+                    inbuf.shutdown();
+                    outbuf.shutdown();
                 }
                 
             };
 
-            factory.getRequester()
-                .execute(new CXFHttpAsyncRequestProducer(entity),
+            factory.getRequester().execute(new CXFHttpAsyncRequestProducer(entity, outbuf),
                          consumer,
                          factory.getPool(),
                          new BasicHttpContext(),
                          callback);
+            
             wrappedStream = new OutputStream() {
+                
                 public void write(byte b[], int off, int len) throws IOException {
-                    waitForEncoder();
-                    if (len == 0) {
-                        return;
-                    }
-                    ByteBuffer bb = ByteBuffer.wrap(b, off, len);
-                    while (bb.hasRemaining()) {
-                        int i = encoder.write(bb);
-                        if (i == -1) {
-                            return;
-                        }
-                    }
+                    outbuf.write(b, off, len);
                 }
                 public void write(int b) throws IOException {
-                    write(new byte[] {(byte)b});
+                    outbuf.write(b);
                 }
                 public void close() throws IOException {
-                    waitForEncoder();
-                    requestioctrl.requestInput();
-                    requestioctrl.requestOutput();
-                    encoder.complete();
+                    outbuf.writeCompleted();
                 }
             };
                         
             // If we need to cache for retransmission, store data in a
             // CacheAndWriteOutputStream. Otherwise write directly to the output stream.
             if (cachingForRetransmission) {
-                cachedStream =
-                    new CacheAndWriteOutputStream(wrappedStream);
+                cachedStream = new CacheAndWriteOutputStream(wrappedStream);
                 wrappedStream = cachedStream;
-            } else {
-                wrappedStream = new BufferedOutputStream(wrappedStream, 8192);
             }
         }
+
         protected synchronized void setHttpResponse(HttpResponse r) {
             httpResponse = r;
+            if (isAsync) {
+                //got a response, need to start the response processing now
+                try {
+                    handleResponseOnWorkqueue(false);
+                    isAsync = false; // don't trigger another start on next block. :-)
+                } catch (Exception ex) {
+                    ex.printStackTrace();
+                }
+            }
             notifyAll();
         }
+
         protected synchronized HttpResponse getHttpResponse() throws IOException {
             while (httpResponse == null) {
                 //FIXME get the read timeout
@@ -269,66 +262,53 @@ public class AsyncHTTPConduit extends HT
         protected void handleResponseAsync() throws IOException {
             isAsync = true;
         }
+        
         protected void closeInputStream() throws IOException {
         }
         
-        protected synchronized void setDecoder(ContentDecoder r, IOControl i) {
-            decoder = r;
-            ioctrl = i;
-            if (isAsync) {
-                //got a response, need to start the response processing now
-                try {
-                    handleResponseOnWorkqueue(false);
-                    isAsync = false; // don't trigger another start on next block. :-)
-                } catch (Exception ex) {
-                    ex.printStackTrace();
+        protected synchronized InputStream getInputStream() throws IOException {
+            return new InputStream() {
+
+                @Override
+                public int read() throws IOException {
+                    return inbuf.read();
                 }
-            }
-            notifyAll();
-        }
-        synchronized void waitForDecoder() throws IOException {
-            while (decoder == null) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    throw new IOException();
+
+                @Override
+                public int read(byte[] b) throws IOException {
+                    return inbuf.read(b);
                 }
-            }
-        }
-        protected synchronized InputStream getInputStream() throws IOException {
-            return Channels.newInputStream(new ReadableByteChannel() {
-                public boolean isOpen() {
-                    try {
-                        waitForDecoder();
-                    } catch (IOException e) {
-                        return false;
-                    }
-                    return !decoder.isCompleted();
+
+                @Override
+                public int read(byte[] b, int off, int len) throws IOException {
+                    return inbuf.read(b, off, len);
                 }
-                public void close() throws IOException {
-                    waitForDecoder();
-                    ioctrl.requestInput();
+
+                @Override
+                public int available() throws IOException {
+                    return inbuf.available();
                 }
-                public int read(ByteBuffer dst) throws IOException {
-                    waitForDecoder();
-                    int i = 0;
-                    while (i == 0) {
-                        //really should wait for an async event
-                        i = decoder.read(dst);
-                    }
-                    return i;
+
+                @Override
+                public void close() throws IOException {
+                    inbuf.close();
                 }
-            });
+                
+            };
         }
+        
         protected boolean usingProxy() {
             return false;
         }
+        
         protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException {
             return null;
         }
+        
         protected synchronized int getResponseCode() throws IOException {
             return getHttpResponse().getStatusLine().getStatusCode();
         }
+        
         protected String getResponseMessage() throws IOException {
             return getHttpResponse().getStatusLine().getReasonPhrase();
         }
@@ -342,87 +322,29 @@ public class AsyncHTTPConduit extends HT
                 h.headerMap().put(header.getName(), s);
             } 
         }
+        
         protected void updateResponseHeaders(Message inMessage) {
             Headers h = new Headers(inMessage);
             readHeaders(h);
         }
+        
         protected InputStream getPartialResponse() throws IOException {
             return null;
         }
+        
         protected void updateCookiesBeforeRetransmit() {
             Headers h = new Headers();
             readHeaders(h);
             cookies.readFromHeaders(h);
         }
+        
         protected void retransmitStream() throws IOException {
         }
+
         protected void setupNewConnection(String newURL) throws IOException {
             httpResponse = null;
-            
-        }
-        
-        
-        class CXFHttpAsyncResponseConsumer extends AbstractAsyncResponseConsumer<Object> {
-            protected void onResponseReceived(HttpResponse response) throws HttpException, IOException {
-                setHttpResponse(response);
-            }
-            protected Object buildResult(HttpContext context) throws Exception {
-                return Boolean.TRUE;
-            }
-            protected void onContentReceived(ContentDecoder dec, IOControl ioc) 
-                throws IOException {
-                setDecoder(dec, ioc);
-                if (!dec.isCompleted()) {
-                    ioctrl.suspendInput();
-                }
-            }
-            protected void onEntityEnclosed(HttpEntity e, ContentType contentType) throws IOException {
-                //nothing
-            }
-            protected void releaseResources() {
-                //decoder = null;
-                //ioctrl = null;
-            }
         }
         
-        class CXFHttpAsyncRequestProducer implements HttpAsyncRequestProducer {
-            CXFHttpRequest entity;
-            public CXFHttpAsyncRequestProducer(CXFHttpRequest e) {
-                entity = e;
-            }
-            public void close() throws IOException {
-            }
-            public HttpHost getTarget() {
-                int i = entity.getURI().getPort();
-                if (i == -1) {
-                    i = 80;
-                }
-                HttpHost host = new HttpHost(entity.getURI().getHost(),
-                                    i,
-                                    entity.getURI().getScheme());
-                return host;
-            }
-            public HttpRequest generateRequest() throws IOException, HttpException {
-                return entity;
-            }
-            public void produceContent(ContentEncoder enc, IOControl ioc) throws IOException {
-                setEncoder(enc, ioc);
-                if (!enc.isCompleted()) { 
-                    ioc.suspendOutput();
-                }
-            }
-            public void requestCompleted(HttpContext context) {
-            }
-            public void failed(Exception ex) {
-                ex.printStackTrace();
-            }
-            public boolean isRepeatable() {
-                return false;
-            }
-            public void resetRequest() throws IOException {
-            }
-        }
-
     }
     
 }

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java?rev=1368064&r1=1368063&r2=1368064&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java Wed Aug  1 15:43:28 2012
@@ -23,7 +23,6 @@ import java.io.IOException;
 
 import org.apache.http.HttpException;
 import org.apache.http.HttpResponse;
-import org.apache.http.concurrent.BasicFuture;
 import org.apache.http.nio.ContentDecoder;
 import org.apache.http.nio.IOControl;
 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
@@ -32,15 +31,16 @@ import org.apache.http.protocol.HttpCont
 public class CXFHttpAsyncResponseConsumer implements HttpAsyncResponseConsumer<Boolean> {
 
     private final SharedInputBuffer buf;
-    private final BasicFuture<HttpResponse> future;
+    private final CXFResponseCallback responseCallback;
     
     private volatile boolean completed;
     private volatile Exception exception;
     
-    public CXFHttpAsyncResponseConsumer(final SharedInputBuffer buf, final BasicFuture<HttpResponse> future) {
+    public CXFHttpAsyncResponseConsumer(
+            final SharedInputBuffer buf, final CXFResponseCallback responseCallback) {
         super();
         this.buf = buf;
-        this.future = future;
+        this.responseCallback = responseCallback;
     }
 
     @Override
@@ -57,7 +57,7 @@ public class CXFHttpAsyncResponseConsume
 
     @Override
     public void responseReceived(final HttpResponse response) throws IOException, HttpException {
-        future.completed(response);
+        responseCallback.responseReceived(response);
     }
 
     @Override
@@ -76,7 +76,6 @@ public class CXFHttpAsyncResponseConsume
         completed = true;
         exception = ex;
         buf.shutdown();
-        future.failed(ex);
     }
 
     @Override

Added: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java?rev=1368064&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java (added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java Wed Aug  1 15:43:28 2012
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import org.apache.http.HttpResponse;
+
+interface CXFResponseCallback {
+
+    void responseReceived(HttpResponse response);
+    
+}

Propchange: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFResponseCallback.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/test/resources/log4j.properties?rev=1368064&r1=1368063&r2=1368064&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/test/resources/log4j.properties (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/test/resources/log4j.properties Wed Aug  1 15:43:28 2012
@@ -4,5 +4,5 @@ log4j.appender.stdout.layout=org.apache.
 log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
  
 log4j.rootLogger=WARN, stdout
-log4j.logger.org.apache.http=WARN
- 
\ No newline at end of file
+#log4j.logger.org.apache.http=DEBUG
+