You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/03/01 00:02:46 UTC

svn commit: r381808 - in /incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http: HttpConfiguration.java HttpConfigurationMBean.java HttpLifeCycle.java processors/ProviderProcessor.java

Author: gnodet
Date: Tue Feb 28 15:02:44 2006
New Revision: 381808

URL: http://svn.apache.org/viewcvs?rev=381808&view=rev
Log:
Changes to the servicemix-http component:
 * now uses jetty nio connector by default for scalability
 * can configure client side parameteres (requests per host, total requests)
 * release the http client connection when finished

Modified:
    incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpConfiguration.java
    incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpConfigurationMBean.java
    incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpLifeCycle.java
    incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ProviderProcessor.java

Modified: incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpConfiguration.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpConfiguration.java?rev=381808&r1=381807&r2=381808&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpConfiguration.java (original)
+++ incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpConfiguration.java Tue Feb 28 15:02:44 2006
@@ -16,10 +16,11 @@
 package org.apache.servicemix.http;
 
 import org.apache.servicemix.common.PersistentConfiguration;
+import org.mortbay.jetty.nio.SelectChannelConnector;
 
 public class HttpConfiguration extends PersistentConfiguration implements HttpConfigurationMBean {
 
-    public static final String DEFAULT_JETTY_CONNECTOR_CLASS_NAME = org.mortbay.jetty.bio.SocketConnector.class.getName();
+    public static final String DEFAULT_JETTY_CONNECTOR_CLASS_NAME = SelectChannelConnector.class.getName();
     
     private boolean streamingEnabled = false;
     private String jettyConnectorClassName = DEFAULT_JETTY_CONNECTOR_CLASS_NAME;
@@ -29,6 +30,16 @@
      * to 255 by default to match the default value in Jetty. 
      */
     private int jettyThreadPoolSize = 255;
+    
+    /**
+     * Maximum number of concurrent requests to the same host.
+     */
+    private int maxConnectionsPerHost = 32;
+    
+    /**
+     * Maximum number of concurrent requests.
+     */
+    private int maxTotalConnections = 256;
 
     public boolean isStreamingEnabled() {
         return streamingEnabled;
@@ -57,10 +68,30 @@
         save();
     }
     
+    public int getMaxConnectionsPerHost() {
+        return maxConnectionsPerHost;
+    }
+
+    public void setMaxConnectionsPerHost(int maxConnectionsPerHost) {
+        this.maxConnectionsPerHost = maxConnectionsPerHost;
+        save();
+    }
+
+    public int getMaxTotalConnections() {
+        return maxTotalConnections;
+    }
+
+    public void setMaxTotalConnections(int maxTotalConnections) {
+        this.maxTotalConnections = maxTotalConnections;
+        save();
+    }
+    
     public void save() {
         properties.setProperty("jettyThreadPoolSize", Integer.toString(jettyThreadPoolSize));
         properties.setProperty("jettyConnectorClassName", jettyConnectorClassName);
         properties.setProperty("streamingEnabled", Boolean.toString(streamingEnabled));
+        properties.setProperty("maxConnectionsPerHost", Integer.toString(maxConnectionsPerHost));
+        properties.setProperty("maxTotalConnections", Integer.toString(maxTotalConnections));
         super.save();
     }
     
@@ -75,10 +106,16 @@
             if (properties.getProperty("streamingEnabled") != null) {
                 streamingEnabled = Boolean.valueOf(properties.getProperty("streamingEnabled")).booleanValue();
             }
+            if (properties.getProperty("maxConnectionsPerHost") != null) {
+                maxConnectionsPerHost = Integer.parseInt(properties.getProperty("maxConnectionsPerHost"));
+            }
+            if (properties.getProperty("maxTotalConnections") != null) {
+                maxTotalConnections = Integer.parseInt(properties.getProperty("maxTotalConnections"));
+            }
             return true;
         } else {
             return false;
         }
     }
-    
+
 }

Modified: incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpConfigurationMBean.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpConfigurationMBean.java?rev=381808&r1=381807&r2=381808&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpConfigurationMBean.java (original)
+++ incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpConfigurationMBean.java Tue Feb 28 15:02:44 2006
@@ -29,4 +29,12 @@
 
     public void setJettyThreadPoolSize(int jettyThreadPoolSize);
     
+    public int getMaxConnectionsPerHost();
+    
+    public void setMaxConnectionsPerHost(int maxConnectionsPerHost);
+    
+    public int getMaxTotalConnections();
+    
+    public void setMaxTotalConnections(int maxTotalConnections);
+    
 }

Modified: incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpLifeCycle.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpLifeCycle.java?rev=381808&r1=381807&r2=381808&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpLifeCycle.java (original)
+++ incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/HttpLifeCycle.java Tue Feb 28 15:02:44 2006
@@ -17,6 +17,7 @@
 
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
 import org.apache.servicemix.common.BaseComponent;
 import org.apache.servicemix.common.BaseLifeCycle;
 
@@ -73,6 +74,10 @@
         }
         if (client == null) {
             connectionManager = new MultiThreadedHttpConnectionManager();
+            HttpConnectionManagerParams params = new HttpConnectionManagerParams();
+            params.setDefaultMaxConnectionsPerHost(configuration.getMaxConnectionsPerHost());
+            params.setMaxTotalConnections(configuration.getMaxTotalConnections());
+            connectionManager.setParams(params);
             client = new HttpClient(connectionManager);
         }
     }

Modified: incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ProviderProcessor.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ProviderProcessor.java?rev=381808&r1=381807&r2=381808&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ProviderProcessor.java (original)
+++ incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ProviderProcessor.java Tue Feb 28 15:02:44 2006
@@ -102,51 +102,55 @@
         }
         method.addRequestHeader("Content-Type", writer.getContentType());
         method.setRequestEntity(writeMessage(writer));
-        int response = getClient().executeMethod(host, method);
-        if (response != HttpStatus.SC_OK) {
-        	if (exchange instanceof InOnly == false) {
-        		Fault fault = exchange.createFault();
+        try {
+            int response = getClient().executeMethod(host, method);
+            if (response != HttpStatus.SC_OK) {
+            	if (exchange instanceof InOnly == false) {
+            		Fault fault = exchange.createFault();
+                    SoapReader reader = soapMarshaler.createReader();
+                    Header contentType = method.getResponseHeader("Content-Type");
+                    soapMessage = reader.read(method.getResponseBodyAsStream(), 
+                    						  contentType != null ? contentType.getValue() : null);
+                    fault.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(method));
+            		jbiMarshaler.toNMS(fault, soapMessage);
+            		exchange.setFault(fault);
+            		exchange.setStatus(ExchangeStatus.ERROR);
+            		channel.send(exchange);
+            		return;
+            	} else {
+            		throw new Exception("Invalid status response: " + response);
+            	}
+            }
+            if (exchange instanceof InOut) {
+                NormalizedMessage msg = exchange.createMessage();
                 SoapReader reader = soapMarshaler.createReader();
                 Header contentType = method.getResponseHeader("Content-Type");
                 soapMessage = reader.read(method.getResponseBodyAsStream(), 
                 						  contentType != null ? contentType.getValue() : null);
-                fault.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(method));
-        		jbiMarshaler.toNMS(fault, soapMessage);
-        		exchange.setFault(fault);
-        		exchange.setStatus(ExchangeStatus.ERROR);
-        		channel.send(exchange);
-        		return;
-        	} else {
-        		throw new Exception("Invalid status response: " + response);
-        	}
-        }
-        if (exchange instanceof InOut) {
-            NormalizedMessage msg = exchange.createMessage();
-            SoapReader reader = soapMarshaler.createReader();
-            Header contentType = method.getResponseHeader("Content-Type");
-            soapMessage = reader.read(method.getResponseBodyAsStream(), 
-            						  contentType != null ? contentType.getValue() : null);
-            msg.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(method));
-            jbiMarshaler.toNMS(msg, soapMessage);
-            ((InOut) exchange).setOutMessage(msg);
-            channel.sendSync(exchange);
-        } else if (exchange instanceof InOptionalOut) {
-            if (method.getResponseContentLength() == 0) {
-                exchange.setStatus(ExchangeStatus.DONE);
-                channel.send(exchange);
-            } else {
-                NormalizedMessage msg = exchange.createMessage();
-                SoapReader reader = soapMarshaler.createReader();
-                soapMessage = reader.read(method.getResponseBodyAsStream(), 
-                                          method.getResponseHeader("Content-Type").getValue());
                 msg.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(method));
                 jbiMarshaler.toNMS(msg, soapMessage);
-                ((InOptionalOut) exchange).setOutMessage(msg);
+                ((InOut) exchange).setOutMessage(msg);
                 channel.sendSync(exchange);
+            } else if (exchange instanceof InOptionalOut) {
+                if (method.getResponseContentLength() == 0) {
+                    exchange.setStatus(ExchangeStatus.DONE);
+                    channel.send(exchange);
+                } else {
+                    NormalizedMessage msg = exchange.createMessage();
+                    SoapReader reader = soapMarshaler.createReader();
+                    soapMessage = reader.read(method.getResponseBodyAsStream(), 
+                                              method.getResponseHeader("Content-Type").getValue());
+                    msg.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(method));
+                    jbiMarshaler.toNMS(msg, soapMessage);
+                    ((InOptionalOut) exchange).setOutMessage(msg);
+                    channel.sendSync(exchange);
+                }
+            } else {
+                exchange.setStatus(ExchangeStatus.DONE);
+                channel.send(exchange);
             }
-        } else {
-            exchange.setStatus(ExchangeStatus.DONE);
-            channel.send(exchange);
+        } finally {
+            method.releaseConnection();
         }
     }