You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by br...@apache.org on 2010/11/05 18:14:53 UTC

svn commit: r1031668 - in /thrift/trunk/lib/java: ivy.xml src/org/apache/thrift/transport/THttpClient.java

Author: bryanduxbury
Date: Fri Nov  5 17:14:52 2010
New Revision: 1031668

URL: http://svn.apache.org/viewvc?rev=1031668&view=rev
Log:
THRIFT-970. java: Under heavy load, THttpClient may fail with 'too many open files'

This patch updates our THttpClient to have two different modes of operation: its current functionality and a new mode that uses Apache's HttpClient library to provide higher throughput and better pooling functionality.

Patch: Mathias Herberts

Modified:
    thrift/trunk/lib/java/ivy.xml
    thrift/trunk/lib/java/src/org/apache/thrift/transport/THttpClient.java

Modified: thrift/trunk/lib/java/ivy.xml
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/java/ivy.xml?rev=1031668&r1=1031667&r2=1031668&view=diff
==============================================================================
--- thrift/trunk/lib/java/ivy.xml (original)
+++ thrift/trunk/lib/java/ivy.xml Fri Nov  5 17:14:52 2010
@@ -33,5 +33,6 @@
        <dependency org="commons-lang" name="commons-lang" rev="2.5" conf="* -> *,!sources,!javadoc"/>
        <dependency org="junit" name="junit" rev="4.4" conf="test -> *,!sources,!javadoc"/>
        <dependency org="javax.servlet" name="servlet-api" rev="2.5" conf="* -> *,!sources,!javadoc"/>
+       <dependency org="org.apache.httpcomponents" name="httpclient" rev="4.0.1" conf="* -> *,!sources,!javadoc"/>
     </dependencies>
 </ivy-module>

Modified: thrift/trunk/lib/java/src/org/apache/thrift/transport/THttpClient.java
URL: http://svn.apache.org/viewvc/thrift/trunk/lib/java/src/org/apache/thrift/transport/THttpClient.java?rev=1031668&r1=1031667&r2=1031668&view=diff
==============================================================================
--- thrift/trunk/lib/java/src/org/apache/thrift/transport/THttpClient.java (original)
+++ thrift/trunk/lib/java/src/org/apache/thrift/transport/THttpClient.java Fri Nov  5 17:14:52 2010
@@ -19,6 +19,7 @@
 
 package org.apache.thrift.transport;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.io.IOException;
@@ -28,17 +29,46 @@ import java.net.HttpURLConnection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.params.CoreConnectionPNames;
+
 /**
  * HTTP implementation of the TTransport interface. Used for working with a
- * Thrift web services implementation.
+ * Thrift web services implementation (using for example TServlet).
+ *
+ * This class offers two implementations of the HTTP transport.
+ * One uses HttpURLConnection instances, the other HttpClient from Apache
+ * Http Components.
+ * The chosen implementation depends on the constructor used to
+ * create the THttpClient instance.
+ * Using the THttpClient(String url) constructor or passing null as the
+ * HttpClient to THttpClient(String url, HttpClient client) will create an
+ * instance which will use HttpURLConnection.
+ *
+ * When using HttpClient, the following configuration leads to 5-15% 
+ * better performance than the HttpURLConnection implementation:
+ *
+ * http.protocol.version=HttpVersion.HTTP_1_1
+ * http.protocol.content-charset=UTF-8
+ * http.protocol.expect-continue=false
+ * http.connection.stalecheck=false
  *
+ * Also note that under high load, the HttpURLConnection implementation
+ * may exhaust the open file descriptor limit.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/THRIFT-970">THRIFT-970</a>
  */
+
 public class THttpClient extends TTransport {
 
   private URL url_ = null;
 
-  private final ByteArrayOutputStream requestBuffer_ =
-    new ByteArrayOutputStream();
+  private final ByteArrayOutputStream requestBuffer_ = new ByteArrayOutputStream();
 
   private InputStream inputStream_ = null;
 
@@ -48,9 +78,54 @@ public class THttpClient extends TTransp
 
   private Map<String,String> customHeaders_ = null;
 
+  private final HttpHost host;
+  
+  private final HttpClient client;
+  
+  public static class Factory extends TTransportFactory {
+    
+    private final String url;
+    private final HttpClient client;
+    
+    public Factory(String url) {
+      this.url = url;
+      this.client = null;
+    }
+
+    public Factory(String url, HttpClient client) {
+      this.url = url;
+      this.client = client;
+    }
+    
+    @Override
+    public TTransport getTransport(TTransport trans) {
+      try {
+        if (null != client) {
+          return new THttpClient(url, client);
+        } else {
+          return new THttpClient(url);
+        }
+      } catch (TTransportException tte) {
+        return null;
+      }
+    }
+  }
+
   public THttpClient(String url) throws TTransportException {
     try {
       url_ = new URL(url);
+      this.client = null;
+      this.host = null;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public THttpClient(String url, HttpClient client) throws TTransportException {
+    try {
+      url_ = new URL(url);
+      this.client = client;
+      this.host = new HttpHost(url_.getHost(), -1 == url_.getPort() ? url_.getDefaultPort() : url_.getPort(), url_.getProtocol());
     } catch (IOException iox) {
       throw new TTransportException(iox);
     }
@@ -58,10 +133,20 @@ public class THttpClient extends TTransp
 
   public void setConnectTimeout(int timeout) {
     connectTimeout_ = timeout;
+    if (null != this.client) {
+      // WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the
+      // same HttpClient is used for something else.
+      client.getParams().setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, connectTimeout_);
+    }
   }
 
   public void setReadTimeout(int timeout) {
     readTimeout_ = timeout;
+    if (null != this.client) {
+      // WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the
+      // same HttpClient is used for something else.
+      client.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT, readTimeout_);
+    }
   }
 
   public void setCustomHeaders(Map<String,String> headers) {
@@ -111,7 +196,102 @@ public class THttpClient extends TTransp
     requestBuffer_.write(buf, off, len);
   }
 
+  private void flushUsingHttpClient() throws TTransportException {
+    
+    if (null == this.client) {
+      throw new TTransportException("Null HttpClient, aborting.");
+    }
+
+    // Extract request and reset buffer
+    byte[] data = requestBuffer_.toByteArray();
+    requestBuffer_.reset();
+
+    HttpPost post = null;
+    
+    InputStream is = null;
+    
+    try {      
+      // Set request to path + query string
+      post = new HttpPost(this.url_.getFile());
+      
+      //
+      // Headers are added to the HttpPost instance, not
+      // to HttpClient.
+      //
+      
+      post.setHeader("Content-Type", "application/x-thrift");
+      post.setHeader("Accept", "application/x-thrift");
+      post.setHeader("User-Agent", "Java/THttpClient/HC");
+      
+      if (null != customHeaders_) {
+        for (Map.Entry<String, String> header : customHeaders_.entrySet()) {
+          post.setHeader(header.getKey(), header.getValue());
+        }
+      }
+
+      post.setEntity(new ByteArrayEntity(data));
+      
+      HttpResponse response = this.client.execute(this.host, post);
+      int responseCode = response.getStatusLine().getStatusCode();
+      
+      if (responseCode != HttpStatus.SC_OK) {
+        throw new TTransportException("HTTP Response code: " + responseCode);
+      }
+
+      // Read the responses into a byte array so we can release the connection
+      // early. This implies that the whole content will have to be read in
+      // memory, and that momentarly we might use up twice the memory (while the
+      // thrift struct is being read up the chain).
+      // Proceeding differently might lead to exhaustion of connections and thus
+      // to app failure.
+      
+      is = response.getEntity().getContent();
+      
+      byte[] buf = new byte[1024];
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      
+      int len = 0;
+      do {
+        len = is.read(buf);
+        if (len > 0) {
+          baos.write(buf, 0, len);
+        }
+      } while (-1 != len);
+      
+      try {
+        // Indicate we're done with the content.
+        response.getEntity().consumeContent();
+      } catch (IOException ioe) {
+        // We ignore this exception, it might only mean the server has no
+        // keep-alive capability.
+      }
+            
+      inputStream_ = new ByteArrayInputStream(baos.toByteArray());
+    } catch (IOException ioe) {
+      // Abort method so the connection gets released back to the connection manager
+      if (null != post) {
+        post.abort();
+      }
+      throw new TTransportException(ioe);
+    } finally {
+      if (null != is) {
+        // Close the entity's input stream, this will release the underlying connection
+        try {
+          is.close();
+        } catch (IOException ioe) {
+          throw new TTransportException(ioe);
+        }
+      }
+    }
+  }
+
   public void flush() throws TTransportException {
+
+    if (null != this.client) {
+      flushUsingHttpClient();
+      return;
+    }
+
     // Extract request and reset buffer
     byte[] data = requestBuffer_.toByteArray();
     requestBuffer_.reset();