You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/11/20 12:12:03 UTC

svn commit: r1411618 - in /camel/branches/camel-2.10.x: ./ components/camel-ahc/src/main/java/org/apache/camel/component/ahc/

Author: davsclaus
Date: Tue Nov 20 11:12:02 2012
New Revision: 1411618

URL: http://svn.apache.org/viewvc?rev=1411618&view=rev
Log:
CAMEL-5808: Added bufferSize option to camel-ahc. And fixed setting reference parameters on clientConfig using uri-style configuration.

Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcComponent.java
    camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java
    camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java
    camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1411617

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcComponent.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcComponent.java?rev=1411618&r1=1411617&r2=1411618&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcComponent.java (original)
+++ camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcComponent.java Tue Nov 20 11:12:02 2012
@@ -80,8 +80,9 @@ public class AhcComponent extends Header
             }
             
             // set and validate additional parameters on client config
-            IntrospectionSupport.setProperties(builder, parameters, CLIENT_CONFIG_PREFIX, true);
-            validateParameters(uri, parameters, CLIENT_CONFIG_PREFIX);
+            Map<String, Object> clientParams = IntrospectionSupport.extractProperties(parameters, CLIENT_CONFIG_PREFIX);
+            setProperties(builder, clientParams);
+            validateParameters(uri, clientParams, null);
             
             endpoint.setClientConfig(builder.build());
         }

Modified: camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java?rev=1411618&r1=1411617&r2=1411618&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java (original)
+++ camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java Tue Nov 20 11:12:02 2012
@@ -45,6 +45,7 @@ public class AhcEndpoint extends Default
     private boolean throwExceptionOnFailure = true;
     private boolean transferException;
     private SSLContextParameters sslContextParameters;
+    private int bufferSize = 4 * 1024;
 
     public AhcEndpoint(String endpointUri, AhcComponent component, URI httpUri) {
         super(endpointUri, component);
@@ -152,6 +153,14 @@ public class AhcEndpoint extends Default
         this.sslContextParameters = sslContextParameters;
     }
 
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
     @Override
     protected void doStart() throws Exception {
         super.doStart();

Modified: camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java?rev=1411618&r1=1411617&r2=1411618&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcProducer.java Tue Nov 20 11:12:02 2012
@@ -51,7 +51,7 @@ public class AhcProducer extends Default
             // AHC supports async processing
             Request request = getEndpoint().getBinding().prepareRequest(getEndpoint(), exchange);
             log.debug("Executing request {} ", request);
-            client.prepareRequest(request).execute(new AhcAsyncHandler(exchange, callback, request.getUrl()));
+            client.prepareRequest(request).execute(new AhcAsyncHandler(exchange, callback, request.getUrl(), getEndpoint().getBufferSize()));
             return false;
         } catch (Exception e) {
             exchange.setException(e);
@@ -73,36 +73,43 @@ public class AhcProducer extends Default
         private int statusCode;
         private String statusText;
 
-        private AhcAsyncHandler(Exchange exchange, AsyncCallback callback, String url) {
+        private AhcAsyncHandler(Exchange exchange, AsyncCallback callback, String url, int bufferSize) {
             this.exchange = exchange;
             this.callback = callback;
             this.url = url;
-            this.os = new ByteArrayOutputStream();
+            this.os = new ByteArrayOutputStream(bufferSize);
         }
 
         @Override
         public void onThrowable(Throwable t) {
-            log.trace("{} onThrowable {}", exchange.getExchangeId(), t);
+            if (log.isTraceEnabled()) {
+                log.trace("{} onThrowable {}", exchange.getExchangeId(), t);
+            }
             try {
                 getEndpoint().getBinding().onThrowable(getEndpoint(), exchange, t);
             } catch (Exception e) {
                 exchange.setException(e);
+            } finally {
+                callback.done(false);
             }
-            callback.done(false);
         }
 
         @Override
         public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
             // write body parts to stream, which we will bind to the Camel Exchange in onComplete
             int wrote = bodyPart.writeTo(os);
-            log.trace("{} onBodyPartReceived {} bytes", exchange.getExchangeId(), wrote);
+            if (log.isTraceEnabled()) {
+                log.trace("{} onBodyPartReceived {} bytes", exchange.getExchangeId(), wrote);
+            }
             contentLength += wrote;
             return STATE.CONTINUE;
         }
 
         @Override
         public STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
-            log.trace("{} onStatusReceived {}", exchange.getExchangeId(), responseStatus);
+            if (log.isTraceEnabled()) {
+                log.trace("{} onStatusReceived {}", exchange.getExchangeId(), responseStatus);
+            }
             try {
                 statusCode = responseStatus.getStatusCode();
                 statusText = responseStatus.getStatusText();
@@ -115,7 +122,9 @@ public class AhcProducer extends Default
 
         @Override
         public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
-            log.trace("{} onHeadersReceived {}", exchange.getExchangeId(), headers);
+            if (log.isTraceEnabled()) {
+                log.trace("{} onHeadersReceived {}", exchange.getExchangeId(), headers);
+            }
             try {
                 getEndpoint().getBinding().onHeadersReceived(getEndpoint(), exchange, headers);
             } catch (Exception e) {
@@ -126,7 +135,9 @@ public class AhcProducer extends Default
 
         @Override
         public Exchange onCompleted() throws Exception {
-            log.trace("{} onCompleted", exchange.getExchangeId());
+            if (log.isTraceEnabled()) {
+                log.trace("{} onCompleted", exchange.getExchangeId());
+            }
             try {
                 getEndpoint().getBinding().onComplete(getEndpoint(), exchange, url, os, contentLength, statusCode, statusText);
             } catch (Exception e) {

Modified: camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java?rev=1411618&r1=1411617&r2=1411618&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java (original)
+++ camel/branches/camel-2.10.x/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/DefaultAhcBinding.java Tue Nov 20 11:12:02 2012
@@ -96,7 +96,9 @@ public class DefaultAhcBinding implement
         for (Map.Entry<String, Object> entry : exchange.getIn().getHeaders().entrySet()) {
             String headerValue = exchange.getIn().getHeader(entry.getKey(), String.class);
             if (strategy != null && !strategy.applyFilterToCamelHeaders(entry.getKey(), headerValue, exchange)) {
-                log.trace("Adding header {} = {}", entry.getKey(), headerValue);
+                if (log.isTraceEnabled()) {
+                    log.trace("Adding header {} = {}", entry.getKey(), headerValue);
+                }
                 builder.addHeader(entry.getKey(), headerValue);
             }
         }
@@ -120,7 +122,7 @@ public class DefaultAhcBinding implement
                         // serialized java object
                         Serializable obj = in.getMandatoryBody(Serializable.class);
                         // write object to output stream
-                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                        ByteArrayOutputStream bos = new ByteArrayOutputStream(endpoint.getBufferSize());
                         AhcHelper.writeObjectToStream(bos, obj);
                         byte[] bytes = bos.toByteArray();
                         body = new ByteArrayBodyGenerator(bytes);