You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/11/24 01:45:38 UTC

svn commit: r1413111 - /manifoldcf/branches/CONNECTORS-120/connectors/sharepoint/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/sharepoint/CommonsHTTPSender.java

Author: kwright
Date: Sat Nov 24 00:45:37 2012
New Revision: 1413111

URL: http://svn.apache.org/viewvc?rev=1413111&view=rev
Log:
Convert file-backed http access to streaming http access.

Modified:
    manifoldcf/branches/CONNECTORS-120/connectors/sharepoint/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/sharepoint/CommonsHTTPSender.java

Modified: manifoldcf/branches/CONNECTORS-120/connectors/sharepoint/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/sharepoint/CommonsHTTPSender.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/connectors/sharepoint/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/sharepoint/CommonsHTTPSender.java?rev=1413111&r1=1413110&r2=1413111&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/connectors/sharepoint/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/sharepoint/CommonsHTTPSender.java (original)
+++ manifoldcf/branches/CONNECTORS-120/connectors/sharepoint/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/sharepoint/CommonsHTTPSender.java Sat Nov 24 00:45:37 2012
@@ -17,6 +17,8 @@
 */
 package org.apache.manifoldcf.crawler.connectors.sharepoint;
 
+import org.apache.manifoldcf.core.common.XThreadInputStream;
+
 import org.apache.axis.AxisFault;
 import org.apache.axis.Constants;
 import org.apache.axis.Message;
@@ -47,6 +49,12 @@ import org.apache.http.ProtocolVersion;
 import org.apache.http.util.EntityUtils;
 import org.apache.http.message.BasicHeader;
 
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.client.RedirectException;
+import org.apache.http.client.CircularRedirectException;
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.HttpException;
+
 import org.apache.commons.logging.Log;
 
 import javax.xml.soap.MimeHeader;
@@ -69,7 +77,9 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
 
 /* Class to use httpcomponents to communicate with a SOAP server.
 * I've replaced the original rather complicated class with a much simpler one that
@@ -90,165 +100,6 @@ public class CommonsHTTPSender extends B
     this.clientProperties = CommonsHTTPClientPropertiesFactory.create();
   }
 
-  protected static class ExecuteMethodThread extends Thread
-  {
-    protected final HttpClient httpClient;
-    protected final String targetURL;
-    protected final MessageContext msgContext;
-
-    protected Throwable exception = null;
-    protected int returnCode = 0;
-
-    public ExecuteMethodThread( HttpClient httpClient, String targetURL, MessageContext msgContext )
-    {
-      super();
-      setDaemon(true);
-      this.httpClient = httpClient;
-      this.targetURL = targetURL;
-      this.msgContext = msgContext;
-    }
-
-    public void run()
-    {
-      try
-      {
-        boolean posting = true;
-        // If we're SOAP 1.2, allow the web method to be set from the
-        // MessageContext.
-        if (msgContext.getSOAPConstants() == SOAPConstants.SOAP12_CONSTANTS) {
-          String webMethod = msgContext.getStrProp(SOAP12Constants.PROP_WEBMETHOD);
-          if (webMethod != null) {
-            posting = webMethod.equals(HTTPConstants.HEADER_POST);
-          }
-        }
-
-        boolean http10 = false;
-        String httpVersion = msgContext.getStrProp(MessageContext.HTTP_TRANSPORT_VERSION);
-        if (httpVersion != null) {
-          if (httpVersion.equals(HTTPConstants.HEADER_PROTOCOL_V10)) {
-            http10 = true;
-          }
-          // assume 1.1
-        }
-
-        HttpRequestBase method;
-        if (posting) {
-          HttpPost postMethod = new HttpPost(targetURL);
-          
-          // set false as default, addContetInfo can overwrite
-          HttpProtocolParams.setUseExpectContinue(postMethod.getParams(),false);
-
-          Message reqMessage = msgContext.getRequestMessage();
-          
-          boolean httpChunkStream = addContextInfo(postMethod, msgContext);
-
-          HttpEntity requestEntity = null;
-          requestEntity = new MessageRequestEntity(reqMessage, httpChunkStream,
-            http10 || !httpChunkStream);
-          postMethod.setEntity(requestEntity);
-          method = postMethod;
-        } else {
-          method = new HttpGet(targetURL);
-        }
-        
-        if (http10)
-          HttpProtocolParams.setVersion(method.getParams(),new ProtocolVersion("HTTP",1,0));
-
-        // Try block to insure that the connection gets cleaned up
-        try
-        {
-          // Begin the fetch
-          HttpResponse response = httpClient.execute(method);
-
-          returnCode = response.getStatusLine().getStatusCode();
-          
-          String contentType =
-            getHeader(response, HTTPConstants.HEADER_CONTENT_TYPE);
-          String contentLocation =
-            getHeader(response, HTTPConstants.HEADER_CONTENT_LOCATION);
-          String contentLength =
-            getHeader(response, HTTPConstants.HEADER_CONTENT_LENGTH);
-
-          if ((returnCode > 199) && (returnCode < 300)) {
-
-            // SOAP return is OK - so fall through
-          } else if (msgContext.getSOAPConstants() ==
-            SOAPConstants.SOAP12_CONSTANTS) {
-            // For now, if we're SOAP 1.2, fall through, since the range of
-            // valid result codes is much greater
-          } else if ((contentType != null) && !contentType.equals("text/html")
-            && ((returnCode > 499) && (returnCode < 600))) {
-
-            // SOAP Fault should be in here - so fall through
-          } else {
-            String statusMessage = response.getStatusLine().toString();
-            AxisFault fault = new AxisFault("HTTP",
-              "(" + returnCode + ")"
-            + statusMessage, null,
-              null);
-
-            fault.setFaultDetailString(
-              Messages.getMessage("return01",
-              "" + returnCode,
-              getResponseBodyAsString(response)));
-            fault.addFaultDetail(Constants.QNAME_FAULTDETAIL_HTTPERRORCODE,
-              Integer.toString(returnCode));
-            throw fault;
-          }
-
-          // Transfer to a temporary file.  If we stream it, we may wind up waiting on the socket outside this thread.
-          InputStream releaseConnectionOnCloseStream = new FileBackedInputStream(response.getEntity().getContent());
-
-          Header contentEncoding =
-            response.getFirstHeader(HTTPConstants.HEADER_CONTENT_ENCODING);
-          if (contentEncoding != null) {
-            AxisFault fault = new AxisFault("HTTP",
-              "unsupported content-encoding of '"
-            + contentEncoding.getValue()
-            + "' found", null, null);
-            throw fault;
-          }
-
-          Message outMsg = new Message(releaseConnectionOnCloseStream,
-            false, contentType, contentLocation);
-          
-          // Transfer HTTP headers of HTTP message to MIME headers of SOAP message
-          Header[] responseHeaders = response.getAllHeaders();
-          MimeHeaders responseMimeHeaders = outMsg.getMimeHeaders();
-          for (int i = 0; i < responseHeaders.length; i++) {
-            Header responseHeader = responseHeaders[i];
-            responseMimeHeaders.addHeader(responseHeader.getName(),
-              responseHeader.getValue());
-          }
-          outMsg.setMessageType(Message.RESPONSE);
-          
-          // Put the message in the message context.
-          msgContext.setResponseMessage(outMsg);
-        }
-        finally
-        {
-          // Consumes and closes the stream, releasing the connection
-          method.abort();
-        }
-
-      }
-      catch (Throwable e)
-      {
-        this.exception = e;
-      }
-    }
-
-    public Throwable getException()
-    {
-      return exception;
-    }
-
-    public int getResponse()
-    {
-      return returnCode;
-    }
-  }
-
   /**
   * invoke creates a socket connection, sends the request SOAP message and then
   * reads the response SOAP message back from the SOAP server
@@ -274,6 +125,133 @@ public class CommonsHTTPSender extends B
       // Get the HttpClient
       HttpClient httpClient = (HttpClient)msgContext.getProperty(SPSProxyHelper.HTTPCLIENT_PROPERTY);
 
+      boolean posting = true;
+      // If we're SOAP 1.2, allow the web method to be set from the
+      // MessageContext.
+      if (msgContext.getSOAPConstants() == SOAPConstants.SOAP12_CONSTANTS) {
+        String webMethod = msgContext.getStrProp(SOAP12Constants.PROP_WEBMETHOD);
+        if (webMethod != null) {
+          posting = webMethod.equals(HTTPConstants.HEADER_POST);
+        }
+      }
+
+      boolean http10 = false;
+      String httpVersion = msgContext.getStrProp(MessageContext.HTTP_TRANSPORT_VERSION);
+      if (httpVersion != null) {
+        if (httpVersion.equals(HTTPConstants.HEADER_PROTOCOL_V10)) {
+          http10 = true;
+        }
+        // assume 1.1
+      }
+
+      HttpRequestBase method;
+        
+      if (posting) {
+        HttpPost postMethod = new HttpPost(targetURL.toString());
+          
+        // set false as default, addContetInfo can overwrite
+        HttpProtocolParams.setUseExpectContinue(postMethod.getParams(),false);
+
+        Message reqMessage = msgContext.getRequestMessage();
+          
+        boolean httpChunkStream = addContextInfo(postMethod, msgContext);
+
+        HttpEntity requestEntity = null;
+        requestEntity = new MessageRequestEntity(reqMessage, httpChunkStream,
+          http10 || !httpChunkStream);
+        postMethod.setEntity(requestEntity);
+        method = postMethod;
+      } else {
+        method = new HttpGet(targetURL.toString());
+      }
+        
+      if (http10)
+        HttpProtocolParams.setVersion(method.getParams(),new ProtocolVersion("HTTP",1,0));
+
+      BackgroundHTTPThread methodThread = new BackgroundHTTPThread(httpClient,method);
+      methodThread.start();
+      try
+      {
+        int returnCode = methodThread.getResponseCode();
+          
+        String contentType =
+          getHeader(methodThread, HTTPConstants.HEADER_CONTENT_TYPE);
+        String contentLocation =
+          getHeader(methodThread, HTTPConstants.HEADER_CONTENT_LOCATION);
+        String contentLength =
+          getHeader(methodThread, HTTPConstants.HEADER_CONTENT_LENGTH);
+        
+        if ((returnCode > 199) && (returnCode < 300)) {
+
+          // SOAP return is OK - so fall through
+        } else if (msgContext.getSOAPConstants() ==
+          SOAPConstants.SOAP12_CONSTANTS) {
+          // For now, if we're SOAP 1.2, fall through, since the range of
+          // valid result codes is much greater
+        } else if ((contentType != null) && !contentType.equals("text/html")
+          && ((returnCode > 499) && (returnCode < 600))) {
+
+          // SOAP Fault should be in here - so fall through
+        } else {
+          String statusMessage = methodThread.getResponseStatus();
+          AxisFault fault = new AxisFault("HTTP",
+            "(" + returnCode + ")"
+          + statusMessage, null,
+            null);
+
+          fault.setFaultDetailString(
+            Messages.getMessage("return01",
+            "" + returnCode,
+            getResponseBodyAsString(methodThread)));
+          fault.addFaultDetail(Constants.QNAME_FAULTDETAIL_HTTPERRORCODE,
+            Integer.toString(returnCode));
+          throw fault;
+        }
+
+        String contentEncoding =
+         methodThread.getFirstHeader(HTTPConstants.HEADER_CONTENT_ENCODING);
+        if (contentEncoding != null) {
+          AxisFault fault = new AxisFault("HTTP",
+            "unsupported content-encoding of '"
+          + contentEncoding
+          + "' found", null, null);
+          throw fault;
+        }
+
+        Map<String,List<String>> responseHeaders = methodThread.getResponseHeaders();
+
+        InputStream dataStream = methodThread.getSafeInputStream();
+
+        Message outMsg = new Message(new BackgroundInputStream(methodThread,dataStream),
+          false, contentType, contentLocation);
+          
+        // Transfer HTTP headers of HTTP message to MIME headers of SOAP message
+        MimeHeaders responseMimeHeaders = outMsg.getMimeHeaders();
+        for (String name : responseHeaders.keySet())
+        {
+          List<String> values = responseHeaders.get(name);
+          for (String value : values) {
+            responseMimeHeaders.addHeader(name,value);
+          }
+        }
+        outMsg.setMessageType(Message.RESPONSE);
+          
+        // Put the message in the message context.
+        msgContext.setResponseMessage(outMsg);
+        
+        // Pass off the method thread to the stream for closure
+        methodThread = null;
+      }
+      finally
+      {
+        if (methodThread != null)
+        {
+          methodThread.abort();
+          methodThread.finishUp();
+        }
+      }
+
+/*
       ExecuteMethodThread t = new ExecuteMethodThread(httpClient,targetURL.toString(),msgContext);
       try
       {
@@ -295,7 +273,7 @@ public class CommonsHTTPSender extends B
         t.interrupt();
         throw e;
       }
-
+*/
       /*
       if (log.isDebugEnabled()) {
         if (null == contentLength) {
@@ -401,20 +379,20 @@ public class CommonsHTTPSender extends B
     return httpChunkStream;
   }
 
-  private static String getHeader(HttpResponse response, String headerName) {
-    Header header = response.getFirstHeader(headerName);
-    return (header == null) ? null : header.getValue().trim();
+  private static String getHeader(BackgroundHTTPThread methodThread, String headerName)
+    throws IOException, InterruptedException, HttpException {
+    String header = methodThread.getFirstHeader(headerName);
+    return (header == null) ? null : header.trim();
   }
 
-  private static String getResponseBodyAsString(HttpResponse httpResponse)
-    throws IOException {
-    HttpEntity entity = httpResponse.getEntity();
-    if (entity != null)
+  private static String getResponseBodyAsString(BackgroundHTTPThread methodThread)
+    throws IOException, InterruptedException, HttpException {
+    InputStream is = methodThread.getSafeInputStream();
+    if (is != null)
     {
-      InputStream is = entity.getContent();
       try
       {
-        String charSet = EntityUtils.getContentCharSet(entity);
+        String charSet = methodThread.getCharSet();
         if (charSet == null)
           charSet = "utf-8";
         char[] buffer = new char[65536];
@@ -444,64 +422,109 @@ public class CommonsHTTPSender extends B
     return "";
   }
   
-  private static class FileBackedInputStream extends InputStream {
+  private static class MessageRequestEntity implements HttpEntity {
+
+    private final Message message;
+    private final boolean httpChunkStream; //Use HTTP chunking or not.
+    private final boolean contentLengthNeeded;
+
+    public MessageRequestEntity(Message message, boolean httpChunkStream, boolean contentLengthNeeded) {
+      this.message = message;
+      this.httpChunkStream = httpChunkStream;
+      this.contentLengthNeeded = contentLengthNeeded;
+    }
+
+    @Override
+    public boolean isChunked() {
+      return httpChunkStream;
+    }
     
-    private InputStream fileInputStream = null;
-    private File file = null;
+    @Override
+    public void consumeContent()
+      throws IOException {
+      EntityUtils.consume(this);
+    }
     
-    public FileBackedInputStream(InputStream is)
-      throws IOException
-    {
-      File readyToOpenFile = null;
-      // Create a file and read into it
-      File tempFile = File.createTempFile("__shp__",".tmp");
-      try
-      {
-        // Open the output stream
-        OutputStream os = new FileOutputStream(tempFile);
-        try
-        {
-          byte[] buffer = new byte[65536];
-          while (true)
-          {
-            int amt = is.read(buffer);
-            if (amt == -1)
-              break;
-            os.write(buffer,0,amt);
-          }
-        }
-        finally
-        {
-          os.close();
-        }
-        readyToOpenFile = tempFile;
-        tempFile = null;
-      }
-      finally
-      {
-        if (tempFile != null)
-          tempFile.delete();
-      }
-      
-      try
-      {
-        fileInputStream = new FileInputStream(readyToOpenFile);
-        file = readyToOpenFile;
-        readyToOpenFile = null;
+    @Override
+    public boolean isRepeatable() {
+      return true;
+    }
+
+    @Override
+    public boolean isStreaming() {
+      return false;
+    }
+    
+    @Override
+    public InputStream getContent()
+      throws IOException, IllegalStateException {
+      // MHL
+      return null;
+    }
+    
+    @Override
+    public void writeTo(OutputStream out)
+      throws IOException {
+      try {
+        this.message.writeTo(out);
+      } catch (SOAPException e) {
+        throw new IOException(e.getMessage());
       }
-      finally
-      {
-        if (readyToOpenFile != null)
-          readyToOpenFile.delete();
+    }
+
+    @Override
+    public long getContentLength() {
+      if (contentLengthNeeded) {
+        try {
+          return message.getContentLength();
+        } catch (Exception e) {
+        }
       }
+      // Unknown (chunked) length
+      return -1L;
+    }
+
+    @Override
+    public Header getContentType() {
+      return null; // a separate header is added
+    }
+
+    @Override
+    public Header getContentEncoding() {
+      return null;
+    }
+  }
+
+  /** This input stream wraps a background http transaction thread, so that
+  * the thread is ended when the stream is closed.
+  */
+  private static class BackgroundInputStream extends InputStream {
+    
+    private BackgroundHTTPThread methodThread = null;
+    private InputStream xThreadInputStream = null;
+    
+    /** Construct an http transaction stream.  The stream is driven by a background
+    * thread, whose existence is tied to this class.  The sequence of activity that
+    * this class expects is as follows:
+    * (1) Construct the httpclient and request object and initialize them
+    * (2) Construct a background method thread, and start it
+    * (3) If the response calls for it, call this constructor, and put the resulting stream
+    *    into the message response
+    * (4) Otherwise, terminate the background method thread in the standard manner,
+    *    being sure NOT
+    */
+    public BackgroundInputStream(BackgroundHTTPThread methodThread, InputStream xThreadInputStream)
+    {
+      this.methodThread = methodThread;
+      this.xThreadInputStream = xThreadInputStream;
     }
     
     @Override
     public int available()
       throws IOException
     {
-      if (fileInputStream != null)
-        return fileInputStream.available();
+      if (xThreadInputStream != null)
+        return xThreadInputStream.available();
       return super.available();
     }
     
@@ -509,29 +532,37 @@ public class CommonsHTTPSender extends B
     public void close()
       throws IOException
     {
-      IOException exception = null;
       try
       {
-        if (fileInputStream != null)
-          fileInputStream.close();
+        if (xThreadInputStream != null)
+        {
+          xThreadInputStream.close();
+          xThreadInputStream = null;
+        }
       }
-      catch (IOException e)
+      finally
       {
-        exception = e;
+        if (methodThread != null)
+        {
+          methodThread.abort();
+          try
+          {
+            methodThread.finishUp();
+          }
+          catch (InterruptedException e)
+          {
+            throw new InterruptedIOException(e.getMessage());
+          }
+          methodThread = null;
+        }
       }
-      fileInputStream = null;
-      if (file != null)
-        file.delete();
-      file = null;
-      if (exception != null)
-        throw exception;
     }
     
     @Override
     public void mark(int readlimit)
     {
-      if (fileInputStream != null)
-        fileInputStream.mark(readlimit);
+      if (xThreadInputStream != null)
+        xThreadInputStream.mark(readlimit);
       else
         super.mark(readlimit);
     }
@@ -540,8 +571,8 @@ public class CommonsHTTPSender extends B
     public void reset()
       throws IOException
     {
-      if (fileInputStream != null)
-        fileInputStream.reset();
+      if (xThreadInputStream != null)
+        xThreadInputStream.reset();
       else
         super.reset();
     }
@@ -549,8 +580,8 @@ public class CommonsHTTPSender extends B
     @Override
     public boolean markSupported()
     {
-      if (fileInputStream != null)
-        return fileInputStream.markSupported();
+      if (xThreadInputStream != null)
+        return xThreadInputStream.markSupported();
       return super.markSupported();
     }
     
@@ -558,8 +589,8 @@ public class CommonsHTTPSender extends B
     public long skip(long n)
       throws IOException
     {
-      if (fileInputStream != null)
-        return fileInputStream.skip(n);
+      if (xThreadInputStream != null)
+        return xThreadInputStream.skip(n);
       return super.skip(n);
     }
     
@@ -567,8 +598,8 @@ public class CommonsHTTPSender extends B
     public int read(byte[] b, int off, int len)
       throws IOException
     {
-      if (fileInputStream != null)
-        return fileInputStream.read(b,off,len);
+      if (xThreadInputStream != null)
+        return xThreadInputStream.read(b,off,len);
       return super.read(b,off,len);
     }
 
@@ -576,8 +607,8 @@ public class CommonsHTTPSender extends B
     public int read(byte[] b)
       throws IOException
     {
-      if (fileInputStream != null)
-        return fileInputStream.read(b);
+      if (xThreadInputStream != null)
+        return xThreadInputStream.read(b);
       return super.read(b);
     }
     
@@ -585,84 +616,328 @@ public class CommonsHTTPSender extends B
     public int read()
       throws IOException
     {
-      if (fileInputStream != null)
-        return fileInputStream.read();
+      if (xThreadInputStream != null)
+        return xThreadInputStream.read();
       return -1;
     }
     
   }
-  
-  private static class MessageRequestEntity implements HttpEntity {
 
-    private final Message message;
-    private final boolean httpChunkStream; //Use HTTP chunking or not.
-    private final boolean contentLengthNeeded;
+  /** This thread does the actual socket communication with the server.
+  * It's set up so that it can be abandoned at shutdown time.
+  *
+  * The way it works is as follows:
+  * - it starts the transaction
+  * - it receives the response, and saves that for the calling class to inspect
+  * - it transfers the data part to an input stream provided to the calling class
+  * - it shuts the connection down
+  *
+  * If there is an error, the sequence is aborted, and an exception is recorded
+  * for the calling class to examine.
+  *
+  * The calling class basically accepts the sequence above.  It starts the
+  * thread, and tries to get a response code.  If instead an exception is seen,
+  * the exception is thrown up the stack.
+  */
+  protected static class BackgroundHTTPThread extends Thread
+  {
+    /** Client and method, all preconfigured */
+    protected final HttpClient httpClient;
+    protected final HttpRequestBase executeMethod;
+    
+    protected HttpResponse response = null;
+    protected Throwable responseException = null;
+    protected XThreadInputStream threadStream = null;
+    protected String charSet = null;
+    protected boolean streamCreated = false;
+    protected Throwable streamException = null;
+    protected boolean abortThread = false;
 
-    public MessageRequestEntity(Message message, boolean httpChunkStream, boolean contentLengthNeeded) {
-      this.message = message;
-      this.httpChunkStream = httpChunkStream;
-      this.contentLengthNeeded = contentLengthNeeded;
+    protected Throwable shutdownException = null;
+
+    protected Throwable generalException = null;
+    
+    public BackgroundHTTPThread(HttpClient httpClient, HttpRequestBase executeMethod)
+    {
+      super();
+      setDaemon(true);
+      this.httpClient = httpClient;
+      this.executeMethod = executeMethod;
     }
 
-    @Override
-    public boolean isChunked() {
-      return httpChunkStream;
+    public void run()
+    {
+      try
+      {
+        try
+        {
+          // Call the execute method appropriately
+          synchronized (this)
+          {
+            if (!abortThread)
+            {
+              try
+              {
+                response = httpClient.execute(executeMethod);
+              }
+              catch (java.net.SocketTimeoutException e)
+              {
+                responseException = e;
+              }
+              catch (ConnectTimeoutException e)
+              {
+                responseException = e;
+              }
+              catch (InterruptedIOException e)
+              {
+                throw e;
+              }
+              catch (Throwable e)
+              {
+                responseException = e;
+              }
+              this.notifyAll();
+            }
+          }
+          
+          // Start the transfer of the content
+          if (responseException == null)
+          {
+            synchronized (this)
+            {
+              if (!abortThread)
+              {
+                try
+                {
+                  HttpEntity entity = response.getEntity();
+                  InputStream bodyStream = entity.getContent();
+                  if (bodyStream != null)
+                  {
+                    threadStream = new XThreadInputStream(bodyStream);
+                    charSet = EntityUtils.getContentCharSet(entity);
+                  }
+                  streamCreated = true;
+                }
+                catch (java.net.SocketTimeoutException e)
+                {
+                  streamException = e;
+                }
+                catch (ConnectTimeoutException e)
+                {
+                  streamException = e;
+                }
+                catch (InterruptedIOException e)
+                {
+                  throw e;
+                }
+                catch (Throwable e)
+                {
+                  streamException = e;
+                }
+                this.notifyAll();
+              }
+            }
+          }
+          
+          if (responseException == null && streamException == null)
+          {
+            if (threadStream != null)
+            {
+              // Stuff the content until we are done
+              threadStream.stuffQueue();
+            }
+          }
+          
+        }
+        finally
+        {
+          synchronized (this)
+          {
+            try
+            {
+              executeMethod.abort();
+            }
+            catch (Throwable e)
+            {
+              shutdownException = e;
+            }
+            this.notifyAll();
+          }
+        }
+      }
+      catch (Throwable e)
+      {
+        // We catch exceptions here that should ONLY be InterruptedExceptions, as a result of the thread being aborted.
+        this.generalException = e;
+      }
     }
-    
-    @Override
-    public void consumeContent()
-      throws IOException {
-      EntityUtils.consume(this);
+
+    public int getResponseCode()
+      throws InterruptedException, IOException, HttpException
+    {
+      // Must wait until the response object is there
+      while (true)
+      {
+        synchronized (this)
+        {
+          checkException(responseException);
+          if (response != null)
+            return response.getStatusLine().getStatusCode();
+          wait();
+        }
+      }
     }
-    
-    @Override
-    public boolean isRepeatable() {
-      return true;
+
+    public String getResponseStatus()
+      throws InterruptedException, IOException, HttpException
+    {
+      // Must wait until the response object is there
+      while (true)
+      {
+        synchronized (this)
+        {
+          checkException(responseException);
+          if (response != null)
+            return response.getStatusLine().toString();
+          wait();
+        }
+      }
     }
 
-    @Override
-    public boolean isStreaming() {
-      return false;
+    public Map<String,List<String>> getResponseHeaders()
+      throws InterruptedException, IOException, HttpException
+    {
+      // Must wait for the response object to appear
+      while (true)
+      {
+        synchronized (this)
+        {
+          checkException(responseException);
+          if (response != null)
+          {
+            Header[] headers = response.getAllHeaders();
+            Map<String,List<String>> rval = new HashMap<String,List<String>>();
+            int i = 0;
+            while (i < headers.length)
+            {
+              Header h = headers[i++];
+              String name = h.getName();
+              String value = h.getValue();
+              List<String> values = rval.get(name);
+              if (values == null)
+              {
+                values = new ArrayList<String>();
+                rval.put(name,values);
+              }
+              values.add(value);
+            }
+            return rval;
+          }
+          wait();
+        }
+      }
+
     }
     
-    @Override
-    public InputStream getContent()
-      throws IOException, IllegalStateException {
-      // MHL
-      return null;
+    public String getFirstHeader(String headerName)
+      throws InterruptedException, IOException, HttpException
+    {
+      // Must wait for the response object to appear
+      while (true)
+      {
+        synchronized (this)
+        {
+          checkException(responseException);
+          if (response != null)
+          {
+            Header h = response.getFirstHeader(headerName);
+            if (h == null)
+              return null;
+            return h.getValue();
+          }
+          wait();
+        }
+      }
+    }
+
+    public InputStream getSafeInputStream()
+      throws InterruptedException, IOException, HttpException
+    {
+      // Must wait until stream is created, or until we note an exception was thrown.
+      while (true)
+      {
+        synchronized (this)
+        {
+          if (responseException != null)
+            throw new IllegalStateException("Check for response before getting stream");
+          checkException(streamException);
+          if (streamCreated)
+            return threadStream;
+          wait();
+        }
+      }
     }
     
-    @Override
-    public void writeTo(OutputStream out)
-      throws IOException {
-      try {
-        this.message.writeTo(out);
-      } catch (SOAPException e) {
-        throw new IOException(e.getMessage());
+    public String getCharSet()
+      throws InterruptedException, IOException, HttpException
+    {
+      while (true)
+      {
+        synchronized (this)
+        {
+          if (responseException != null)
+            throw new IllegalStateException("Check for response before getting charset");
+          checkException(streamException);
+          if (streamCreated)
+            return charSet;
+          wait();
+        }
       }
     }
-
-    @Override
-    public long getContentLength() {
-      if (contentLengthNeeded) {
-        try {
-          return message.getContentLength();
-        } catch (Exception e) {
+    
+    public void abort()
+    {
+      // This will be called during the finally
+      // block in the case where all is well (and
+      // the stream completed) and in the case where
+      // there were exceptions.
+      synchronized (this)
+      {
+        if (streamCreated)
+        {
+          if (threadStream != null)
+            threadStream.abort();
         }
+        abortThread = true;
       }
-      // Unknown (chunked) length
-      return -1L;
     }
-
-    @Override
-    public Header getContentType() {
-      return null; // a separate header is added
+    
+    public void finishUp()
+      throws InterruptedException
+    {
+      join();
     }
-
-    @Override
-    public Header getContentEncoding() {
-      return null;
+    
+    protected synchronized void checkException(Throwable exception)
+      throws IOException, HttpException
+    {
+      if (exception != null)
+      {
+        // Throw the current exception, but clear it, so no further throwing is possible on the same problem.
+        Throwable e = exception;
+        if (e instanceof IOException)
+          throw (IOException)e;
+        else if (e instanceof HttpException)
+          throw (HttpException)e;
+        else if (e instanceof RuntimeException)
+          throw (RuntimeException)e;
+        else if (e instanceof Error)
+          throw (Error)e;
+        else
+          throw new RuntimeException("Unhandled exception of type: "+e.getClass().getName(),e);
+      }
     }
+
   }
 
 }