You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/10/03 14:38:25 UTC

[3/3] camel git commit: CAMEL-11696: Add an advanced properties to the camel-thrift component

CAMEL-11696: Add an advanced properties to the camel-thrift component

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5a501706
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5a501706
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5a501706

Branch: refs/heads/master
Commit: 5a501706c92ba05d28a416e9ec9eba07f66f1a1a
Parents: 971c80b
Author: Dmitry Volodin <dm...@gmail.com>
Authored: Mon Oct 2 14:42:36 2017 +0300
Committer: lburgazzoli <lb...@gmail.com>
Committed: Tue Oct 3 16:37:50 2017 +0200

----------------------------------------------------------------------
 components/camel-thrift/README.md               |  31 ++
 components/camel-thrift/ReadMe.md               |  31 --
 .../src/main/docs/thrift-component.adoc         |  21 +-
 .../component/thrift/ThriftCompressionType.java |  32 +++
 .../component/thrift/ThriftConfiguration.java   |  76 ++++-
 .../camel/component/thrift/ThriftConstants.java |   3 +
 .../camel/component/thrift/ThriftConsumer.java  | 108 +++++--
 .../thrift/ThriftExchangeProtocol.java          |  42 +++
 .../component/thrift/ThriftNegotiationType.java |  37 +++
 .../camel/component/thrift/ThriftProducer.java  |  56 +++-
 .../thrift/ThriftSSLConfiguration.java          | 183 ++++++++++++
 .../camel/component/thrift/ThriftUtils.java     |  65 ++++-
 .../thrift/server/ThriftHsHaServer.java         |  12 -
 .../thrift/server/ThriftThreadPoolServer.java   | 283 +++++++++++++++++++
 .../thrift/ThriftConsumerSecurityTest.java      | 143 ++++++++++
 .../ThriftConsumerZlibCompressionTest.java      | 126 +++++++++
 .../thrift/ThriftProducerBaseTest.java          | 142 +---------
 .../thrift/ThriftProducerSecurityTest.java      | 211 ++++++++++++++
 .../ThriftProducerZlibCompressionTest.java      | 126 +++++++++
 .../thrift/impl/CalculatorAsyncServerImpl.java  |  94 ++++++
 .../thrift/impl/CalculatorSyncServerImpl.java   |  89 ++++++
 .../local/ThriftThreadPoolServerTest.java       | 106 +++++++
 .../src/test/resources/certs/README.md          |  13 +
 .../src/test/resources/certs/keystore.jks       | Bin 0 -> 2231 bytes
 .../src/test/resources/certs/truststore.jks     | Bin 0 -> 942 bytes
 .../camel-thrift/src/test/thrift/README.md      |   9 +
 .../camel-thrift/src/test/thrift/readme.txt     |   6 -
 27 files changed, 1811 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/README.md
----------------------------------------------------------------------
diff --git a/components/camel-thrift/README.md b/components/camel-thrift/README.md
new file mode 100644
index 0000000..668e452
--- /dev/null
+++ b/components/camel-thrift/README.md
@@ -0,0 +1,31 @@
+# How to upgrade Apache Thrift
+
+You need to install the thrift compiler from
+
+    https://github.com/apache/thrift/releases
+
+For linux/osx you download the .tar distro, and untar it, and then
+
+    sudo ./bootstrap.sh
+    export CXXFLAGS='-Os -ffunction-sections -Wl,--gc-sections -fno-asynchronous-unwind-tables -Wl,--strip-all'
+    sudo ./configure --without-c_glib --without-java --without-python --without-ruby --without-nodejs --disable-libs --disable-tests --disable-tutorial --disable-shared --enable-static
+    sudo ./make check
+    sudo ./make install
+
+If its succesful, you can type
+
+    thrift --version
+
+To display the version of the thrift compiler.
+
+You then need to compile the sample test source for the `camel-thrift` component.
+
+The sample test source is an example taken from the Thrift Java tutorial at: https://thrift.apache.org/tutorial/java
+
+    cd components/camel-thrift
+    cd src/test/thrift
+    thrift -r --gen java -out ../java/ ./tutorial-dataformat.thrift
+    thrift -r --gen java -out ../java/ ./tutorial-component.thrift
+
+The generate source code will override the existing.
+

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/ReadMe.md
----------------------------------------------------------------------
diff --git a/components/camel-thrift/ReadMe.md b/components/camel-thrift/ReadMe.md
deleted file mode 100644
index 668e452..0000000
--- a/components/camel-thrift/ReadMe.md
+++ /dev/null
@@ -1,31 +0,0 @@
-# How to upgrade Apache Thrift
-
-You need to install the thrift compiler from
-
-    https://github.com/apache/thrift/releases
-
-For linux/osx you download the .tar distro, and untar it, and then
-
-    sudo ./bootstrap.sh
-    export CXXFLAGS='-Os -ffunction-sections -Wl,--gc-sections -fno-asynchronous-unwind-tables -Wl,--strip-all'
-    sudo ./configure --without-c_glib --without-java --without-python --without-ruby --without-nodejs --disable-libs --disable-tests --disable-tutorial --disable-shared --enable-static
-    sudo ./make check
-    sudo ./make install
-
-If its succesful, you can type
-
-    thrift --version
-
-To display the version of the thrift compiler.
-
-You then need to compile the sample test source for the `camel-thrift` component.
-
-The sample test source is an example taken from the Thrift Java tutorial at: https://thrift.apache.org/tutorial/java
-
-    cd components/camel-thrift
-    cd src/test/thrift
-    thrift -r --gen java -out ../java/ ./tutorial-dataformat.thrift
-    thrift -r --gen java -out ../java/ ./tutorial-component.thrift
-
-The generate source code will override the existing.
-

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/docs/thrift-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/docs/thrift-component.adoc b/components/camel-thrift/src/main/docs/thrift-component.adoc
index c5954fb..8dcf10e2 100644
--- a/components/camel-thrift/src/main/docs/thrift-component.adoc
+++ b/components/camel-thrift/src/main/docs/thrift-component.adoc
@@ -45,23 +45,38 @@ with the following path and query parameters:
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *host* | The Thrift server host name. This is localhost or 0.0.0.0 (if not defined) when being a consumer or remote server hostname when using producer. |  | String
+| *host* | The Thrift server host name. This is localhost or 0.0.0.0 (if not defined) when being a consumer or remote server host name when using producer. |  | String
 | *port* | *Required* The Thrift server port |  | int
-| *service* | *Required* Fully qualified service name from the protocol buffer descriptor file (package dot service definition name) |  | String
+| *service* | *Required* Fully qualified service name from the thrift descriptor file (package dot service definition name) |  | String
 |===
 
-==== Query Parameters (7 parameters):
+==== Query Parameters (22 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *compressionType* (common) | Protocol compression mechanism type | NONE | ThriftCompressionType
+| *exchangeProtocol* (common) | Exchange protocol serialization type | BINARY | ThriftExchangeProtocol
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean
+| *clientTimeout* (consumer) | Client timeout for consumers |  | int
 | *maxPoolSize* (consumer) | The Thrift server consumer max thread pool size | 10 | int
 | *poolSize* (consumer) | The Thrift server consumer initial thread pool size | 1 | int
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | *method* (producer) | The Thrift invoked method name |  | String
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *cipherSuites* (security) | Cipher suites array |  | String[]
+| *keyManagerType* (security) | Key store manager type |  | String
+| *keyStorePassword* (security) | Key store password |  | String
+| *keyStorePath* (security) | Path to the key store file |  | String
+| *keyStoreType* (security) | Key store type | JKS | String
+| *negotiationType* (security) | Security negotiation type | PLAINTEXT | ThriftNegotiationType
+| *requireClientAuth* (security) | Set if client authentication is required | false | boolean
+| *securityProtocol* (security) | Security negotiation protocol | TLS | String
+| *trustManagerType* (security) | Trust store manager type |  | String
+| *trustPassword* (security) | Trust store password |  | String
+| *trustStorePath* (security) | Path to the trust store file |  | String
+| *trustStoreType* (security) | Trust store type | JKS | String
 |===
 // endpoint options: END
 

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftCompressionType.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftCompressionType.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftCompressionType.java
new file mode 100644
index 0000000..d41b1d2
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftCompressionType.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+/**
+ * Protocol compression mechanism types enum
+ */
+public enum ThriftCompressionType {
+    /**
+     * No compression defined
+     */
+    NONE,
+        
+    /**
+     * Standard Java utility zlib implementation
+     */
+    ZLIB,
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConfiguration.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConfiguration.java
index 2fcabf9..0255a42 100644
--- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConfiguration.java
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConfiguration.java
@@ -40,6 +40,21 @@ public class ThriftConfiguration {
 
     @UriParam(label = "producer")
     private String method;
+    
+    @UriParam(defaultValue = "BINARY")
+    private ThriftExchangeProtocol exchangeProtocol = ThriftExchangeProtocol.BINARY;
+    
+    @UriParam(label = "security", defaultValue = "PLAINTEXT")
+    private ThriftNegotiationType negotiationType = ThriftNegotiationType.PLAINTEXT;
+    
+    @UriParam(label = "security")
+    private ThriftSSLConfiguration sslConfiguration;
+    
+    @UriParam(defaultValue = "NONE")
+    private ThriftCompressionType compressionType = ThriftCompressionType.NONE;
+    
+    @UriParam(label = "consumer")
+    private int clientTimeout;
 
     @UriParam(label = "consumer", defaultValue = "" + ThriftConstants.THRIFT_CONSUMER_POOL_SIZE)
     private int poolSize = ThriftConstants.THRIFT_CONSUMER_POOL_SIZE;
@@ -48,7 +63,7 @@ public class ThriftConfiguration {
     private int maxPoolSize = ThriftConstants.THRIFT_CONSUMER_MAX_POOL_SIZE;
 
     /**
-     * Fully qualified service name from the protocol buffer descriptor file
+     * Fully qualified service name from the thrift descriptor file
      * (package dot service definition name)
      */
     public String getService() {
@@ -69,10 +84,54 @@ public class ThriftConfiguration {
     public void setMethod(String method) {
         this.method = method;
     }
+    
+    /**
+     * Exchange protocol serialization type
+     */
+    public ThriftExchangeProtocol getExchangeProtocol() {
+        return exchangeProtocol;
+    }
+
+    public void setExchangeProtocol(ThriftExchangeProtocol exchangeProtocol) {
+        this.exchangeProtocol = exchangeProtocol;
+    }
+
+    /**
+     * Security negotiation type
+     */
+    public ThriftNegotiationType getNegotiationType() {
+        return negotiationType;
+    }
+
+    public void setNegotiationType(ThriftNegotiationType negotiationType) {
+        this.negotiationType = negotiationType;
+    }
+
+    /**
+     * Configuration parameters for SSL/TLS security negotiation
+     */
+    public ThriftSSLConfiguration getSslConfiguration() {
+        return sslConfiguration;
+    }
+
+    public void setSslConfiguration(ThriftSSLConfiguration sslConfiguration) {
+        this.sslConfiguration = sslConfiguration;
+    }
+    
+    /**
+     * Protocol compression mechanism type
+     */
+    public ThriftCompressionType getCompressionType() {
+        return compressionType;
+    }
+
+    public void setCompressionType(ThriftCompressionType compressionType) {
+        this.compressionType = compressionType;
+    }
 
     /**
      * The Thrift server host name. This is localhost or 0.0.0.0 (if not
-     * defined) when being a consumer or remote server hostname when using
+     * defined) when being a consumer or remote server host name when using
      * producer.
      */
     public String getHost() {
@@ -93,6 +152,17 @@ public class ThriftConfiguration {
     public void setPort(int port) {
         this.port = port;
     }
+    
+    /**
+     * Client timeout for consumers
+     */
+    public int getClientTimeout() {
+        return clientTimeout;
+    }
+
+    public void setClientTimeout(int clientTimeout) {
+        this.clientTimeout = clientTimeout;
+    }
 
     /**
      * The Thrift server consumer initial thread pool size
@@ -124,5 +194,5 @@ public class ThriftConfiguration {
         }
         
         setService(uri.getPath().substring(1));
-    }
+    }    
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConstants.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConstants.java
index 3656c75..b229c47 100644
--- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConstants.java
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConstants.java
@@ -30,6 +30,9 @@ public interface ThriftConstants {
     String THRIFT_SERVER_ASYNC_INTERFACE_NAME = "AsyncIface";
     String THRIFT_SERVER_ASYNC_PROCESSOR_CLASS = "AsyncProcessor";
     
+    String THRIFT_DEFAULT_SECURITY_PROTOCOL = "TLS";
+    String THRIFT_DEFAULT_SECURITY_STORE_TYPE = "JKS";
+    
     int THRIFT_CONSUMER_POOL_SIZE = 1;
     int THRIFT_CONSUMER_MAX_POOL_SIZE = 10;
     /*

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java
index efa916e..68706c0 100644
--- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java
@@ -17,7 +17,9 @@
 package org.apache.camel.component.thrift;
 
 import java.lang.reflect.InvocationTargetException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 
 import javassist.util.proxy.MethodHandler;
 import javassist.util.proxy.Proxy;
@@ -26,14 +28,17 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.thrift.server.ThriftHsHaServer;
-import org.apache.camel.component.thrift.server.ThriftHsHaServer.Args;
 import org.apache.camel.component.thrift.server.ThriftMethodHandler;
+import org.apache.camel.component.thrift.server.ThriftThreadPoolServer;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TZlibTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +48,8 @@ import org.slf4j.LoggerFactory;
 public class ThriftConsumer extends DefaultConsumer {
     private static final Logger LOG = LoggerFactory.getLogger(ThriftConsumer.class);
 
-    private TNonblockingServerSocket serverTransport;
+    private TNonblockingServerSocket asyncServerTransport;
+    private TServerSocket syncServerTransport;
     private TServer server;
     private final ThriftConfiguration configuration;
     private final ThriftEndpoint endpoint;
@@ -65,7 +71,7 @@ public class ThriftConsumer extends DefaultConsumer {
             LOG.debug("Starting the Thrift server");
             initializeServer();
             server.serve();
-            LOG.info("Thrift server started and listening on port: {}", serverTransport.getPort());
+            LOG.info("Thrift server started and listening on port: {}", asyncServerTransport == null ? syncServerTransport.getServerSocket().getLocalPort() : asyncServerTransport.getPort());
         }
     }
 
@@ -74,8 +80,15 @@ public class ThriftConsumer extends DefaultConsumer {
         if (server != null) {
             LOG.debug("Terminating Thrift server");
             server.stop();
-            serverTransport.close();
-            serverTransport = null;
+            if (ObjectHelper.isNotEmpty(asyncServerTransport)) {
+                asyncServerTransport.close();
+                asyncServerTransport = null;
+            }
+            if (ObjectHelper.isNotEmpty(syncServerTransport)) {
+                syncServerTransport.close();
+                syncServerTransport = null;
+            }
+            server.stop();
             server = null;
         }
         super.doStop();
@@ -102,23 +115,78 @@ public class ThriftConsumer extends DefaultConsumer {
             throw new IllegalArgumentException("Unable to create server implementation proxy service for " + configuration.getService());
         }
 
-        if (!ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) {
-            LOG.debug("Building Thrift server on {}:{}", configuration.getHost(), configuration.getPort());
-            serverTransport = new TNonblockingServerSocket(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
-        } else if (ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) {
-            LOG.debug("Building Thrift server on <any address>:{}", configuration.getPort());
-            serverTransport = new TNonblockingServerSocket(configuration.getPort());
+        if (configuration.getNegotiationType() == ThriftNegotiationType.SSL && endpoint.isSynchronous()) {
+            ThriftSSLConfiguration sslConfiguration = configuration.getSslConfiguration();
+            if (sslConfiguration == null) {
+                throw new IllegalArgumentException("SSL Configuration must be initialized if negotiation type is set to " + configuration.getNegotiationType());
+            }
+
+            ObjectHelper.notNull(sslConfiguration.getSecurityProtocol(), "Security protocol");
+            ObjectHelper.notNull(sslConfiguration.getKeyStorePath(), "Keystore path");
+            ObjectHelper.notNull(sslConfiguration.getKeyStorePassword(), "Keystore password");
+            ObjectHelper.notNull(sslConfiguration.getKeyManagerType(), "Key manager type");
+            ObjectHelper.notNull(sslConfiguration.getKeyStoreType(), "Key store type");
+
+            TSSLTransportFactory.TSSLTransportParameters sslParams = new TSSLTransportFactory.TSSLTransportParameters(sslConfiguration.getSecurityProtocol(),
+                                                                                                                      sslConfiguration.getCipherSuites());
+            sslParams.setKeyStore(sslConfiguration.getKeyStorePath(), sslConfiguration.getKeyStorePassword(), sslConfiguration.getKeyManagerType(),
+                                  sslConfiguration.getKeyStoreType());
+            sslParams.requireClientAuth(sslConfiguration.isRequireClientAuth());
+
+            try {
+                syncServerTransport = TSSLTransportFactory.getServerSocket(configuration.getPort(), configuration.getClientTimeout(), InetAddress.getByName(configuration.getHost()),
+                                                                          sslParams);
+            } catch (UnknownHostException e) {
+                throw new IllegalArgumentException("Unknown host defined: " + configuration.getHost());
+            }
+            ThriftThreadPoolServer.Args args = new ThriftThreadPoolServer.Args(syncServerTransport);
+            args.processor((TProcessor)serverProcessor);
+            args.executorService(getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(this, getEndpoint().getEndpointUri(), configuration.getPoolSize(),
+                                                                                                           configuration.getMaxPoolSize()));
+            args.startThreadPool(getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "start-" + getEndpoint().getEndpointUri()));
+            args.context(endpoint.getCamelContext());
+
+            server = new ThriftThreadPoolServer(args);
+        } else if (configuration.getCompressionType() == ThriftCompressionType.ZLIB && endpoint.isSynchronous()) {
+            if (ObjectHelper.isNotEmpty(configuration.getHost()) && ObjectHelper.isNotEmpty(configuration.getPort())) {
+                LOG.debug("Building sync Thrift server on {}:{}", configuration.getHost(), configuration.getPort());
+                syncServerTransport = new TServerSocket(new InetSocketAddress(configuration.getHost(), configuration.getPort()), configuration.getClientTimeout());
+            } else if (ObjectHelper.isEmpty(configuration.getHost()) && ObjectHelper.isNotEmpty(configuration.getPort())) {
+                LOG.debug("Building sync Thrift server on <any address>:{}", configuration.getPort());
+                syncServerTransport = new TServerSocket(configuration.getPort(), configuration.getClientTimeout());
+            } else {
+                throw new IllegalArgumentException("No server start properties (host, port) specified");
+            }
+            
+            ThriftThreadPoolServer.Args args = new ThriftThreadPoolServer.Args(syncServerTransport);
+            args.processor((TProcessor)serverProcessor);
+            args.transportFactory(new TZlibTransport.Factory());
+            args.executorService(getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(this, getEndpoint().getEndpointUri(), configuration.getPoolSize(),
+                                                                                                           configuration.getMaxPoolSize()));
+            args.startThreadPool(getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "start-" + getEndpoint().getEndpointUri()));
+            args.context(endpoint.getCamelContext());
+            
+            server = new ThriftThreadPoolServer(args);
         } else {
-            throw new IllegalArgumentException("No server start properties (host, port) specified");
-        }
 
-        Args args = new Args(serverTransport);
-        args.processor((TProcessor)serverProcessor);
-        args.executorService(getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(this, getEndpoint().getEndpointUri(), configuration.getPoolSize(),
-                                                                                                       configuration.getMaxPoolSize()));
-        args.startThreadPool(getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "start-" + getEndpoint().getEndpointUri()));
-        args.context(endpoint.getCamelContext());
-        server = new ThriftHsHaServer(args);
+            if (ObjectHelper.isNotEmpty(configuration.getHost()) && ObjectHelper.isNotEmpty(configuration.getPort())) {
+                LOG.debug("Building Thrift server on {}:{}", configuration.getHost(), configuration.getPort());
+                asyncServerTransport = new TNonblockingServerSocket(new InetSocketAddress(configuration.getHost(), configuration.getPort()), configuration.getClientTimeout());
+            } else if (ObjectHelper.isEmpty(configuration.getHost()) && ObjectHelper.isNotEmpty(configuration.getPort())) {
+                LOG.debug("Building Thrift server on <any address>:{}", configuration.getPort());
+                asyncServerTransport = new TNonblockingServerSocket(configuration.getPort(), configuration.getClientTimeout());
+            } else {
+                throw new IllegalArgumentException("No server start properties (host, port) specified");
+            }
+
+            ThriftHsHaServer.Args args = new ThriftHsHaServer.Args(asyncServerTransport);
+            args.processor((TProcessor)serverProcessor);
+            args.executorService(getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(this, getEndpoint().getEndpointUri(), configuration.getPoolSize(),
+                                                                                                           configuration.getMaxPoolSize()));
+            args.startThreadPool(getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "start-" + getEndpoint().getEndpointUri()));
+            args.context(endpoint.getCamelContext());
+            server = new ThriftHsHaServer(args);
+        }
     }
 
     public boolean process(Exchange exchange, AsyncCallback callback) {

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftExchangeProtocol.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftExchangeProtocol.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftExchangeProtocol.java
new file mode 100644
index 0000000..06ade67
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftExchangeProtocol.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+/**
+ * Exchange protocol serialization types enum
+ */
+public enum ThriftExchangeProtocol {
+    /**
+     * Binary protocol
+     */
+    BINARY,
+    
+    /**
+     * Full JSON protocol
+     */
+    JSON,
+    
+    /**
+     * Simple JSON protocol
+     */
+    SJSON,
+
+    /**
+     * Compact binary protocol
+     */
+    COMPACT
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftNegotiationType.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftNegotiationType.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftNegotiationType.java
new file mode 100644
index 0000000..d5622e8
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftNegotiationType.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+/**
+ * Security negotiation types enum
+ */
+public enum ThriftNegotiationType {
+    /**
+     * No security negotiation defined
+     */
+    PLAINTEXT,
+        
+    /**
+     * SSL/TLS security negotiation
+     */
+    SSL,
+
+    /**
+     * SASL security negotiation. Not implemented
+     */
+    SASL
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java
index b36ef0c..ad9ca71 100644
--- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java
@@ -26,11 +26,9 @@ import org.apache.camel.component.thrift.client.AsyncClientMethodCallback;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TNonblockingSocket;
 import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TSSLTransportFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -45,7 +43,6 @@ public class ThriftProducer extends DefaultProducer implements AsyncProcessor {
 
     protected final ThriftConfiguration configuration;
     protected final ThriftEndpoint endpoint;
-    private TProtocol protocol;
     private TTransport syncTransport;
     private TNonblockingTransport asyncTransport;
     private Object thriftClient;
@@ -93,17 +90,35 @@ public class ThriftProducer extends DefaultProducer implements AsyncProcessor {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        if (endpoint.isSynchronous()) {
+        
+        if (configuration.getNegotiationType() == ThriftNegotiationType.SSL) {
+            if (!endpoint.isSynchronous()) {
+                throw new IllegalArgumentException("The SSL negotiation type requires to set syncronous communication mode");
+            }
+            
+            if (syncTransport == null) {
+                initializeSslTransport();
+                LOG.info("Getting synchronous secured client implementation");
+                thriftClient = ThriftUtils.constructClientInstance(endpoint.getServicePackage(), endpoint.getServiceName(),
+                                                                   syncTransport, configuration.getExchangeProtocol(),
+                                                                   configuration.getNegotiationType(), configuration.getCompressionType(),
+                                                                   endpoint.getCamelContext());
+            }
+        } else if (endpoint.isSynchronous()) {
             if (syncTransport == null) {
                 initializeSyncTransport();
                 LOG.info("Getting synchronous client implementation");
-                thriftClient = ThriftUtils.constructClientInstance(endpoint.getServicePackage(), endpoint.getServiceName(), protocol, endpoint.getCamelContext());
+                thriftClient = ThriftUtils.constructClientInstance(endpoint.getServicePackage(), endpoint.getServiceName(),
+                                                                   syncTransport, configuration.getExchangeProtocol(),
+                                                                   configuration.getNegotiationType(), configuration.getCompressionType(),
+                                                                   endpoint.getCamelContext());
             }
         } else {
             if (asyncTransport == null) {
                 initializeAsyncTransport();
                 LOG.info("Getting asynchronous client implementation");
-                thriftClient = ThriftUtils.constructAsyncClientInstance(endpoint.getServicePackage(), endpoint.getServiceName(), asyncTransport, endpoint.getCamelContext());
+                thriftClient = ThriftUtils.constructAsyncClientInstance(endpoint.getServicePackage(), endpoint.getServiceName(), asyncTransport,
+                                                                        configuration.getExchangeProtocol(), endpoint.getCamelContext());
             }
         }
     }
@@ -114,7 +129,6 @@ public class ThriftProducer extends DefaultProducer implements AsyncProcessor {
             LOG.debug("Terminating synchronous transport the remote Thrift server");
             syncTransport.close();
             syncTransport = null;
-            protocol = null;
         } else if (asyncTransport != null) {
             LOG.debug("Terminating asynchronous transport the remote Thrift server");
             asyncTransport.close();
@@ -131,7 +145,6 @@ public class ThriftProducer extends DefaultProducer implements AsyncProcessor {
             throw new IllegalArgumentException("No connection properties (host, port) specified");
         }
         syncTransport.open();
-        protocol = new TBinaryProtocol(new TFramedTransport(syncTransport));
     }
     
     protected void initializeAsyncTransport() throws IOException, TTransportException {
@@ -143,4 +156,27 @@ public class ThriftProducer extends DefaultProducer implements AsyncProcessor {
         }
     }
     
-}
+    protected void initializeSslTransport() throws TTransportException {
+        if (!ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) {
+            ThriftSSLConfiguration sslConfiguration = configuration.getSslConfiguration();
+            if (sslConfiguration == null) {
+                throw new IllegalArgumentException("SSL Configuration must be initialized if negotiation type is set to " + configuration.getNegotiationType());
+            }
+            
+            ObjectHelper.notNull(sslConfiguration.getSecurityProtocol(), "Security protocol");
+            ObjectHelper.notNull(sslConfiguration.getTrustStorePath(), "Trust store path");
+            ObjectHelper.notNull(sslConfiguration.getTrustPassword(), "Trust store password");
+            ObjectHelper.notNull(sslConfiguration.getTrustManagerType(), "Trust manager type");
+            ObjectHelper.notNull(sslConfiguration.getTrustStoreType(), "Trust store type");
+            
+            LOG.info("Creating secured transport to the remote Thrift server {}:{}", configuration.getHost(), configuration.getPort());
+            
+            TSSLTransportFactory.TSSLTransportParameters sslParams = new TSSLTransportFactory.TSSLTransportParameters(sslConfiguration.getSecurityProtocol(), sslConfiguration.getCipherSuites());
+            
+            sslParams.setTrustStore(sslConfiguration.getTrustStorePath(), sslConfiguration.getTrustPassword(), sslConfiguration.getTrustManagerType(), sslConfiguration.getTrustStoreType());
+            syncTransport = TSSLTransportFactory.getClientSocket(configuration.getHost(), configuration.getPort(), configuration.getClientTimeout(), sslParams);
+        } else {
+            throw new IllegalArgumentException("No connection properties (host, port) specified");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftSSLConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftSSLConfiguration.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftSSLConfiguration.java
new file mode 100644
index 0000000..f8ea0ef
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftSSLConfiguration.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+/**
+ * Configuration parameters for SSL/TLS security negotiation
+ */
+@UriParams
+public class ThriftSSLConfiguration {
+    
+    @UriParam(label = "security", defaultValue = ThriftConstants.THRIFT_DEFAULT_SECURITY_PROTOCOL)
+    private String securityProtocol = ThriftConstants.THRIFT_DEFAULT_SECURITY_PROTOCOL;
+    
+    @UriParam(label = "security")
+    private String[] cipherSuites;
+    
+    @UriParam(label = "consumer,security")
+    private String keyStorePath;
+    
+    @UriParam(label = "consumer,security", secret = true)
+    private String keyStorePassword;
+    
+    @UriParam(label = "consumer,security")
+    private String keyManagerType = TrustManagerFactory.getDefaultAlgorithm();
+    
+    @UriParam(label = "consumer,security", defaultValue = ThriftConstants.THRIFT_DEFAULT_SECURITY_STORE_TYPE)
+    private String keyStoreType = ThriftConstants.THRIFT_DEFAULT_SECURITY_STORE_TYPE;
+    
+    @UriParam(label = "producer,security")
+    private String trustStorePath;
+    
+    @UriParam(label = "producer,security", secret = true)
+    private String trustPassword;
+    
+    @UriParam(label = "producer,security")
+    private String trustManagerType = TrustManagerFactory.getDefaultAlgorithm();
+    
+    @UriParam(label = "producer,security", defaultValue = ThriftConstants.THRIFT_DEFAULT_SECURITY_STORE_TYPE)
+    private String trustStoreType = ThriftConstants.THRIFT_DEFAULT_SECURITY_STORE_TYPE;
+    
+    @UriParam(label = "consumer,security", defaultValue = "false")
+    private boolean requireClientAuth;
+    
+    /**
+     * Security negotiation protocol
+     */
+    public String getSecurityProtocol() {
+        return securityProtocol;
+    }
+    
+    public void setSecurityProtocol(String protocol) {
+        this.securityProtocol = protocol;
+    }
+    
+    /**
+     * Cipher suites array
+     */
+    public String[] getCipherSuites() {
+        return cipherSuites;
+    }
+    
+    public void setCipherSuites(String[] cipherSuites) {
+        this.cipherSuites = cipherSuites;
+    }
+    
+    /**
+     * Path to the key store file
+     */
+    public String getKeyStorePath() {
+        return keyStorePath;
+    }
+    
+    public void setKeyStorePath(String keyStorePath) {
+        this.keyStorePath = keyStorePath;
+    }
+    
+    /**
+     * Key store password
+     */
+    public String getKeyStorePassword() {
+        return keyStorePassword;
+    }
+    
+    public void setKeyStorePassword(String keyStorePassword) {
+        this.keyStorePassword = keyStorePassword;
+    }
+    
+    /**
+     * Key store manager type
+     */
+    public String getKeyManagerType() {
+        return keyManagerType;
+    }
+    
+    public void setKeyManagerType(String keyManagerType) {
+        this.keyManagerType = keyManagerType;
+    }
+    
+    /**
+     * Key store type
+     */
+    public String getKeyStoreType() {
+        return keyStoreType;
+    }
+    
+    public void setKeyStoreType(String keyStoreType) {
+        this.keyStoreType = keyStoreType;
+    }
+    
+    /**
+     * Path to the trust store file
+     */
+    public String getTrustStorePath() {
+        return trustStorePath;
+    }
+    
+    public void setTrustStorePath(String trustStorePath) {
+        this.trustStorePath = trustStorePath;
+    }
+    
+    /**
+     * Trust store password
+     */
+    public String getTrustPassword() {
+        return trustPassword;
+    }
+    
+    public void setTrustPassword(String trustPassword) {
+        this.trustPassword = trustPassword;
+    }
+    
+    /**
+     * Trust store manager type
+     */
+    public String getTrustManagerType() {
+        return trustManagerType;
+    }
+    
+    public void setTrustManagerType(String trustManagerType) {
+        this.trustManagerType = trustManagerType;
+    }
+    
+    /**
+     * Trust store type
+     */
+    public String getTrustStoreType() {
+        return trustStoreType;
+    }
+    
+    public void setTrustStoreType(String trustStoreType) {
+        this.trustStoreType = trustStoreType;
+    }
+    
+    /**
+     * Set if client authentication is required
+     */
+    public boolean isRequireClientAuth() {
+        return requireClientAuth;
+    }
+    
+    public void setRequireClientAuth(boolean requireClientAuth) {
+        this.requireClientAuth = requireClientAuth;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftUtils.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftUtils.java
index 8d66179..58aa77b 100644
--- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftUtils.java
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftUtils.java
@@ -33,9 +33,15 @@ import org.apache.camel.util.ReflectionHelper;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.async.TAsyncClientManager;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.protocol.TSimpleJSONProtocol;
+import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TZlibTransport;
 
 /**
  * ThriftUtils helpers are working with dynamic methods via Camel and Java
@@ -55,10 +61,11 @@ public final class ThriftUtils {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    public static Object constructClientInstance(String packageName, String serviceName, TProtocol protocol, final CamelContext context) {
+    public static Object constructClientInstance(String packageName, String serviceName, TTransport transport, ThriftExchangeProtocol exchangeProtocol,
+                                                 final ThriftNegotiationType negotiationType, final ThriftCompressionType compressionType, final CamelContext context) {
         Object clientInstance = null;
         Class[] constructorParamTypes = {TProtocol.class};
-        Object[] constructorParamValues = {protocol};
+        Object[] constructorParamValues = {constructSyncProtocol(transport, exchangeProtocol, negotiationType, compressionType)};
 
         String clientClassName = packageName + "." + serviceName + "$" + ThriftConstants.THRIFT_SYNC_CLIENT_CLASS_NAME;
         try {
@@ -77,7 +84,7 @@ public final class ThriftUtils {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    public static Object constructAsyncClientInstance(String packageName, String serviceName, TNonblockingTransport transport, final CamelContext context) {
+    public static Object constructAsyncClientInstance(String packageName, String serviceName, TNonblockingTransport transport, ThriftExchangeProtocol exchangeProtocol, final CamelContext context) {
         Object asynClientInstance = null;
         Class[] getterParamTypes = {TNonblockingTransport.class};
         Class[] constructorParamTypes = {TAsyncClientManager.class, TProtocolFactory.class};
@@ -86,7 +93,7 @@ public final class ThriftUtils {
         try {
             Class clientClass = context.getClassResolver().resolveMandatoryClass(clientClassName);
             Constructor factoryConstructor = clientClass.getConstructor(constructorParamTypes);
-            Object factoryInstance = factoryConstructor.newInstance(new TAsyncClientManager(), new TBinaryProtocol.Factory());
+            Object factoryInstance = factoryConstructor.newInstance(new TAsyncClientManager(), constructAsyncProtocol(exchangeProtocol));
             Method asyncClientGetter = ReflectionHelper.findMethod(clientClass, ThriftConstants.THRIFT_ASYNC_CLIENT_GETTER_NAME, getterParamTypes);
             if (asyncClientGetter == null) {
                 throw new IllegalArgumentException("Thrift async client getter not found: " + clientClassName + "." + ThriftConstants.THRIFT_ASYNC_CLIENT_GETTER_NAME);
@@ -174,9 +181,47 @@ public final class ThriftUtils {
         }
         return processorInstance;
     }
+    
+    private static TProtocol constructSyncProtocol(TTransport transport, ThriftExchangeProtocol exchangeProtocol,
+                                                   final ThriftNegotiationType negotiationType, final ThriftCompressionType compressionType) {
+        if (negotiationType == ThriftNegotiationType.SSL) {
+            // If negotiation passed over SSL/TLS the only binary transport is supported
+            return new TBinaryProtocol(transport);
+        } else if (compressionType == ThriftCompressionType.ZLIB) {
+            return new TBinaryProtocol(new TZlibTransport(transport));
+        } else {
+            switch (exchangeProtocol) {
+            case BINARY:
+                return new TBinaryProtocol(new TFramedTransport(transport));
+            case JSON:
+                return new TJSONProtocol(new TFramedTransport(transport));
+            case SJSON:
+                return new TSimpleJSONProtocol(new TFramedTransport(transport));
+            case COMPACT:
+                return new TCompactProtocol(new TFramedTransport(transport));
+            default:
+                throw new IllegalArgumentException("Exchange protocol " + exchangeProtocol + " not implemented");
+            }
+        }
+    }
+    
+    private static TProtocolFactory constructAsyncProtocol(ThriftExchangeProtocol exchangeProtocol) {
+        switch (exchangeProtocol) {
+        case BINARY:
+            return new TBinaryProtocol.Factory();
+        case JSON:
+            return new TJSONProtocol.Factory();
+        case SJSON:
+            return new TSimpleJSONProtocol.Factory();
+        case COMPACT:
+            return new TCompactProtocol.Factory();
+        default:
+            throw new IllegalArgumentException("Exchange protocol " + exchangeProtocol + " not implemented");    
+        }
+    }
 
-    /*
-     * This function find onComplete method inside interface implementation and
+    /**
+     * The function find onComplete method inside interface implementation and
      * get fist parameter (but not Object.class) as return type
      */
     @SuppressWarnings("rawtypes")
@@ -198,9 +243,9 @@ public final class ThriftUtils {
         return "(" + joiner.toString() + ")";
     }
 
-    /*
-     * This function transforms objects types stored as list or simple object
-     * inside Body to the primitives objects to find appropriate method
+    /**
+     * The function transforms objects types stored as list or simple object
+     * inside the Body to the primitives objects to find appropriate method
      */
     @SuppressWarnings({"rawtypes"})
     private static Object[] convertObjects2Primitives(Object request, AsyncClientMethodCallback methodCallback) {
@@ -258,4 +303,4 @@ public final class ThriftUtils {
         }
         return new Object[] {paramsTypes, paramsValues};
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftHsHaServer.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftHsHaServer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftHsHaServer.java
index 09d22a48..2ae49e1 100644
--- a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftHsHaServer.java
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftHsHaServer.java
@@ -40,28 +40,16 @@ public class ThriftHsHaServer extends TNonblockingServer {
             super(transport);
         }
 
-        public ExecutorService getExecutorService() {
-            return executorService;
-        }
-
         public Args executorService(ExecutorService executorService) {
             this.executorService = executorService;
             return this;
         }
 
-        public ExecutorService getStartThreadPool() {
-            return startThreadPool;
-        }
-
         public Args startThreadPool(ExecutorService startThreadPool) {
             this.startThreadPool = startThreadPool;
             return this;
         }
 
-        public CamelContext getContext() {
-            return context;
-        }
-
         public Args context(CamelContext context) {
             this.context = context;
             return this;

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java
new file mode 100644
index 0000000..1b749c9
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftThreadPoolServer.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift.server;
+
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TSaslTransportException;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Thrift ThreadPoolServer implementation with executors controlled by the Camel Executor Service Manager
+ */
+public class ThriftThreadPoolServer extends TServer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ThriftThreadPoolServer.class.getName());
+
+    public static class Args extends AbstractServerArgs<Args> {
+        private ExecutorService executorService;
+        private ExecutorService startThreadPool;
+        private CamelContext context;
+
+        private int requestTimeout = 20;
+        private TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
+        private int beBackoffSlotLength = 100;
+        private TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS;
+
+        public Args(TServerTransport transport) {
+            super(transport);
+        }
+
+        public Args requestTimeout(int n) {
+            requestTimeout = n;
+            return this;
+        }
+
+        public Args requestTimeoutUnit(TimeUnit tu) {
+            requestTimeoutUnit = tu;
+            return this;
+        }
+
+        // Binary exponential backoff slot length
+        public Args beBackoffSlotLength(int n) {
+            beBackoffSlotLength = n;
+            return this;
+        }
+
+        // Binary exponential backoff slot time unit
+        public Args beBackoffSlotLengthUnit(TimeUnit tu) {
+            beBackoffSlotLengthUnit = tu;
+            return this;
+        }
+
+        public Args executorService(ExecutorService executorService) {
+            this.executorService = executorService;
+            return this;
+        }
+
+        public Args startThreadPool(ExecutorService startThreadPool) {
+            this.startThreadPool = startThreadPool;
+            return this;
+        }
+
+        public Args context(CamelContext context) {
+            this.context = context;
+            return this;
+        }
+    }
+
+    // Executor service for handling client connections
+    private final ExecutorService invoker;
+    private final CamelContext context;
+    private final ExecutorService startExecutor;
+
+    private final TimeUnit requestTimeoutUnit;
+
+    private final long requestTimeout;
+
+    private final long beBackoffSlotInMillis;
+
+    private Random random = new Random(System.currentTimeMillis());
+
+    public ThriftThreadPoolServer(Args args) {
+        super(args);
+
+        requestTimeoutUnit = args.requestTimeoutUnit;
+        requestTimeout = args.requestTimeout;
+        beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength);
+
+        context = args.context;
+        invoker = args.executorService;
+        startExecutor = args.startThreadPool;
+    }
+
+    public void serve() {
+        try {
+            serverTransport_.listen();
+        } catch (TTransportException ttx) {
+            LOGGER.error("Error occurred during listening.", ttx);
+            return;
+        }
+
+        // Run the preServe event
+        if (eventHandler_ != null) {
+            eventHandler_.preServe();
+        }
+
+        startExecutor.execute(() -> {
+            stopped_ = false;
+            setServing(true);
+            
+            waitForShutdown();
+            
+            context.getExecutorServiceManager().shutdownGraceful(invoker);
+            setServing(false);
+        });
+    }
+
+    public void waitForShutdown() {
+        int failureCount = 0;
+        while (!stopped_) {
+            try {
+                TTransport client = serverTransport_.accept();
+                WorkerProcess wp = new WorkerProcess(client);
+
+                int retryCount = 0;
+                long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
+                while (true) {
+                    try {
+                        invoker.execute(wp);
+                        break;
+                    } catch (Throwable t) {
+                        if (t instanceof RejectedExecutionException) {
+                            retryCount++;
+                            try {
+                                if (remainTimeInMillis > 0) {
+                                    // do a truncated 20 binary exponential
+                                    // backoff sleep
+                                    long sleepTimeInMillis = ((long)(random.nextDouble() * (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
+                                    sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
+                                    TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
+                                    remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
+                                } else {
+                                    client.close();
+                                    wp = null;
+                                    LOGGER.warn("Task has been rejected by ExecutorService " + retryCount + " times till timedout, reason: " + t);
+                                    break;
+                                }
+                            } catch (InterruptedException e) {
+                                LOGGER.warn("Interrupted while waiting to place client on executor queue.");
+                                Thread.currentThread().interrupt();
+                                break;
+                            }
+                        } else if (t instanceof Error) {
+                            LOGGER.error("ExecutorService threw error: " + t, t);
+                            throw (Error)t;
+                        } else {
+                            // for other possible runtime errors from
+                            // ExecutorService, should also not kill serve
+                            LOGGER.warn("ExecutorService threw error: " + t, t);
+                            break;
+                        }
+                    }
+                }
+            } catch (TTransportException ttx) {
+                if (!stopped_) {
+                    ++failureCount;
+                    LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
+                }
+            }
+        }
+    }
+
+    public void stop() {
+        stopped_ = true;
+        serverTransport_.interrupt();
+        context.getExecutorServiceManager().shutdownGraceful(startExecutor);
+    }
+
+    private final class WorkerProcess implements Runnable {
+
+        /**
+         * Client that this services.
+         */
+        private TTransport client;
+
+        /**
+         * Default constructor.
+         *
+         * @param client Transport to process
+         */
+        private WorkerProcess(TTransport client) {
+            this.client = client;
+        }
+
+        /**
+         * Loops on processing a client forever
+         */
+        public void run() {
+            TProcessor processor = null;
+            TTransport inputTransport = null;
+            TTransport outputTransport = null;
+            TProtocol inputProtocol = null;
+            TProtocol outputProtocol = null;
+
+            TServerEventHandler eventHandler = null;
+            ServerContext connectionContext = null;
+
+            try {
+                processor = processorFactory_.getProcessor(client);
+                inputTransport = inputTransportFactory_.getTransport(client);
+                outputTransport = outputTransportFactory_.getTransport(client);
+                inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+                outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+
+                eventHandler = getEventHandler();
+                if (eventHandler != null) {
+                    connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
+                }
+                // we check stopped_ first to make sure we're not supposed to be
+                // shutting
+                // down. this is necessary for graceful shutdown.
+                while (true) {
+
+                    if (eventHandler != null) {
+                        eventHandler.processContext(connectionContext, inputTransport, outputTransport);
+                    }
+
+                    if (stopped_ || !processor.process(inputProtocol, outputProtocol)) {
+                        break;
+                    }
+                }
+            } catch (TSaslTransportException ttx) {
+                // Something thats not SASL was in the stream, continue silently
+            } catch (TTransportException ttx) {
+                // Assume the client died and continue silently
+            } catch (TException tx) {
+                LOGGER.error("Thrift error occurred during processing of message.", tx);
+            } catch (Exception x) {
+                LOGGER.error("Error occurred during processing of message.", x);
+            } finally {
+                if (eventHandler != null) {
+                    eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
+                }
+                if (inputTransport != null) {
+                    inputTransport.close();
+                }
+                if (outputTransport != null) {
+                    outputTransport.close();
+                }
+                if (client.isOpen()) {
+                    client.close();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java
new file mode 100644
index 0000000..5bdebb3
--- /dev/null
+++ b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerSecurityTest.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+import java.io.IOException;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.thrift.generated.Calculator;
+import org.apache.camel.component.thrift.generated.Operation;
+import org.apache.camel.component.thrift.generated.Work;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftConsumerSecurityTest extends CamelTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(ThriftConsumerSecurityTest.class);
+    private static final int THRIFT_TEST_PORT = AvailablePortFinder.getNextAvailable();
+    private static final int THRIFT_TEST_NUM1 = 12;
+    private static final int THRIFT_TEST_NUM2 = 13;
+    private static final String TRUST_STORE_PATH = "src/test/resources/certs/truststore.jks";
+    private static final String KEY_STORE_PATH = "src/test/resources/certs/keystore.jks";
+    private static final String SECURITY_STORE_PASSWORD = "camelinaction";
+    private static final int THRIFT_CLIENT_TIMEOUT = 2000;
+    
+    private static Calculator.Client thriftClient;
+    
+    private TProtocol protocol;
+    private TTransport transport;
+    
+    @Before
+    public void startThriftSecureClient() throws IOException, TTransportException {
+        if (transport == null) {
+            LOG.info("Connecting to the secured Thrift server on port: {}", THRIFT_TEST_PORT);
+            
+            TSSLTransportFactory.TSSLTransportParameters sslParams = new TSSLTransportFactory.TSSLTransportParameters();
+            
+            sslParams.setTrustStore(TRUST_STORE_PATH, SECURITY_STORE_PASSWORD);
+            transport = TSSLTransportFactory.getClientSocket("localhost", THRIFT_TEST_PORT, THRIFT_CLIENT_TIMEOUT, sslParams);
+            
+            protocol = new TBinaryProtocol(transport);
+            thriftClient = new Calculator.Client(protocol);
+            LOG.info("Connected to the secured Thrift server");
+        }
+    }
+
+    @After
+    public void stopThriftClient() throws Exception {
+        if (transport != null) {
+            transport.close();
+            transport = null;
+            LOG.info("Connection to the Thrift server closed");
+        }
+    }
+    
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        ThriftSSLConfiguration sslConfig = new ThriftSSLConfiguration();
+        
+        sslConfig.setKeyStorePath(KEY_STORE_PATH);
+        sslConfig.setKeyStorePassword(SECURITY_STORE_PASSWORD);
+        jndi.bind("sslConfig", sslConfig);
+        return jndi;
+    }
+    
+    @Test
+    public void testCalculateMethodInvocation() throws Exception {
+        LOG.info("Test Calculate method invocation");
+        
+        Work work = new Work(THRIFT_TEST_NUM1, THRIFT_TEST_NUM2, Operation.MULTIPLY);
+        
+        int calculateResult = thriftClient.calculate(1, work);
+        
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:thrift-secure-service");
+        mockEndpoint.expectedMessageCount(1);
+        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(ThriftConstants.THRIFT_METHOD_NAME_HEADER, "calculate");
+        mockEndpoint.assertIsSatisfied();
+        
+        assertEquals(THRIFT_TEST_NUM1 * THRIFT_TEST_NUM2, calculateResult);
+    }
+    
+    @Test
+    public void testEchoMethodInvocation() throws Exception {
+        LOG.info("Test Echo method invocation");
+        
+        Work echoResult = thriftClient.echo(new Work(THRIFT_TEST_NUM1, THRIFT_TEST_NUM2, Operation.MULTIPLY));
+        
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:thrift-secure-service");
+        mockEndpoint.expectedMessageCount(1);
+        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(ThriftConstants.THRIFT_METHOD_NAME_HEADER, "echo");
+        mockEndpoint.assertIsSatisfied();
+
+        assertNotNull(echoResult);
+        assertTrue(echoResult instanceof Work);
+        assertEquals(THRIFT_TEST_NUM1, ((Work)echoResult).num1);
+        assertEquals(Operation.MULTIPLY, ((Work)echoResult).op);
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
+                from("thrift://localhost:" + THRIFT_TEST_PORT + "/org.apache.camel.component.thrift.generated.Calculator?negotiationType=SSL&sslConfiguration=#sslConfig&synchronous=true")
+                    .to("mock:thrift-secure-service").choice()
+                        .when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("calculate")).setBody(simple(new Integer(THRIFT_TEST_NUM1 * THRIFT_TEST_NUM2).toString()))
+                        .when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("echo")).setBody(simple("${body[0]}")).bean(new CalculatorMessageBuilder(), "echo");
+            }
+        };
+    }
+    
+    public class CalculatorMessageBuilder {
+        public Work echo(Work work) {
+            return work.deepCopy();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerZlibCompressionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerZlibCompressionTest.java b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerZlibCompressionTest.java
new file mode 100644
index 0000000..5ffb826
--- /dev/null
+++ b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerZlibCompressionTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+import java.io.IOException;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.thrift.generated.Calculator;
+import org.apache.camel.component.thrift.generated.Operation;
+import org.apache.camel.component.thrift.generated.Work;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TZlibTransport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftConsumerZlibCompressionTest extends CamelTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(ThriftConsumerZlibCompressionTest.class);
+    private static final int THRIFT_TEST_PORT = AvailablePortFinder.getNextAvailable();
+    private static final int THRIFT_TEST_NUM1 = 12;
+    private static final int THRIFT_TEST_NUM2 = 13;
+    private static final int THRIFT_CLIENT_TIMEOUT = 2000;
+    
+    private static Calculator.Client thriftClient;
+    
+    private TProtocol protocol;
+    private TTransport transport;
+    
+    @Before
+    public void startThriftZlibClient() throws IOException, TTransportException {
+        if (transport == null) {
+            LOG.info("Connecting to the Thrift server with zlib compression on port: {}", THRIFT_TEST_PORT);
+            
+            transport = new TSocket("localhost", THRIFT_TEST_PORT, THRIFT_CLIENT_TIMEOUT);
+            protocol = new TBinaryProtocol(new TZlibTransport(transport));
+            thriftClient = new Calculator.Client(protocol);
+            transport.open();
+            LOG.info("Connected to the Thrift server with zlib compression");
+        }
+    }
+
+    @After
+    public void stopThriftClient() throws Exception {
+        if (transport != null) {
+            transport.close();
+            transport = null;
+            LOG.info("Connection to the Thrift server closed");
+        }
+    }
+    
+    @Test
+    public void testCalculateMethodInvocation() throws Exception {
+        LOG.info("Test Calculate method invocation");
+        
+        Work work = new Work(THRIFT_TEST_NUM1, THRIFT_TEST_NUM2, Operation.MULTIPLY);
+        
+        int calculateResult = thriftClient.calculate(1, work);
+        
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:thrift-secure-service");
+        mockEndpoint.expectedMessageCount(1);
+        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(ThriftConstants.THRIFT_METHOD_NAME_HEADER, "calculate");
+        mockEndpoint.assertIsSatisfied();
+        
+        assertEquals(THRIFT_TEST_NUM1 * THRIFT_TEST_NUM2, calculateResult);
+    }
+    
+    @Test
+    public void testEchoMethodInvocation() throws Exception {
+        LOG.info("Test Echo method invocation");
+        
+        Work echoResult = thriftClient.echo(new Work(THRIFT_TEST_NUM1, THRIFT_TEST_NUM2, Operation.MULTIPLY));
+        
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:thrift-secure-service");
+        mockEndpoint.expectedMessageCount(1);
+        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(ThriftConstants.THRIFT_METHOD_NAME_HEADER, "echo");
+        mockEndpoint.assertIsSatisfied();
+
+        assertNotNull(echoResult);
+        assertTrue(echoResult instanceof Work);
+        assertEquals(THRIFT_TEST_NUM1, ((Work)echoResult).num1);
+        assertEquals(Operation.MULTIPLY, ((Work)echoResult).op);
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
+                from("thrift://localhost:" + THRIFT_TEST_PORT + "/org.apache.camel.component.thrift.generated.Calculator?compressionType=ZLIB&synchronous=true")
+                    .to("mock:thrift-secure-service").choice()
+                        .when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("calculate")).setBody(simple(new Integer(THRIFT_TEST_NUM1 * THRIFT_TEST_NUM2).toString()))
+                        .when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("echo")).setBody(simple("${body[0]}")).bean(new CalculatorMessageBuilder(), "echo");
+            }
+        };
+    }
+    
+    public class CalculatorMessageBuilder {
+        public Work echo(Work work) {
+            return work.deepCopy();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/5a501706/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerBaseTest.java b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerBaseTest.java
index b0e3228..e59d927 100644
--- a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerBaseTest.java
+++ b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftProducerBaseTest.java
@@ -17,18 +17,11 @@
 package org.apache.camel.component.thrift;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.camel.component.thrift.generated.Calculator;
-import org.apache.camel.component.thrift.generated.InvalidOperation;
-import org.apache.camel.component.thrift.generated.Work;
+import org.apache.camel.component.thrift.impl.CalculatorSyncServerImpl;
 import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit4.CamelTestSupport;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.server.THsHaServer;
 import org.apache.thrift.server.THsHaServer.Args;
 import org.apache.thrift.server.TServer;
@@ -42,13 +35,12 @@ public class ThriftProducerBaseTest extends CamelTestSupport {
     protected static final int THRIFT_TEST_PORT = AvailablePortFinder.getNextAvailable();
     protected static final int THRIFT_TEST_NUM1 = 12;
     protected static final int THRIFT_TEST_NUM2 = 13;
+    @SuppressWarnings({"rawtypes"})
+    protected static Calculator.Processor processor;
 
     private static final Logger LOG = LoggerFactory.getLogger(ThriftProducerBaseTest.class);
-
     private static TNonblockingServerSocket serverTransport;
     private static TServer server;
-    @SuppressWarnings({"rawtypes"})
-    private static Calculator.Processor processor;
 
     @BeforeClass
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -73,132 +65,4 @@ public class ThriftProducerBaseTest extends CamelTestSupport {
             LOG.info("Thrift server stoped");
         }
     }
-
-    /**
-     * Test Thrift Calculator blocking server implementation
-     */
-    public static class CalculatorSyncServerImpl implements Calculator.Iface {
-
-        @Override
-        public void ping() throws TException {
-        }
-
-        @Override
-        public int add(int num1, int num2) throws TException {
-            return num1 + num2;
-        }
-
-        @Override
-        public int calculate(int logId, Work work) throws InvalidOperation, TException {
-            int val = 0;
-            switch (work.op) {
-            case ADD:
-                val = work.num1 + work.num2;
-                break;
-            case SUBTRACT:
-                val = work.num1 - work.num2;
-                break;
-            case MULTIPLY:
-                val = work.num1 * work.num2;
-                break;
-            case DIVIDE:
-                if (work.num2 == 0) {
-                    InvalidOperation io = new InvalidOperation();
-                    io.whatOp = work.op.getValue();
-                    io.why = "Cannot divide by 0";
-                    throw io;
-                }
-                val = work.num1 / work.num2;
-                break;
-            default:
-                InvalidOperation io = new InvalidOperation();
-                io.whatOp = work.op.getValue();
-                io.why = "Unknown operation";
-                throw io;
-            }
-
-            return val;
-        }
-
-        @Override
-        public void zip() throws TException {
-        }
-
-        @Override
-        public Work echo(Work w) throws TException {
-            return w.deepCopy();
-        }
-
-        @Override
-        public int alltypes(boolean v1, byte v2, short v3, int v4, long v5, double v6, String v7, ByteBuffer v8, Work v9, List<Integer> v10, Set<String> v11, Map<String, Long> v12)
-            throws TException {
-            return 1;
-        }
-    }
-
-    /**
-     * Test Thrift Calculator nonblocking server implementation
-     */
-    public static class CalculatorAsyncServerImpl implements Calculator.AsyncIface {
-
-        @Override
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        public void ping(AsyncMethodCallback resultHandler) throws TException {
-            resultHandler.onComplete(new Object());
-        }
-
-        @Override
-        public void add(int num1, int num2, AsyncMethodCallback<Integer> resultHandler) throws TException {
-            resultHandler.onComplete(new Integer(num1 + num2));
-        }
-
-        @Override
-        public void calculate(int logid, Work work, AsyncMethodCallback<Integer> resultHandler) throws TException {
-            int val = 0;
-            switch (work.op) {
-            case ADD:
-                val = work.num1 + work.num2;
-                break;
-            case SUBTRACT:
-                val = work.num1 - work.num2;
-                break;
-            case MULTIPLY:
-                val = work.num1 * work.num2;
-                break;
-            case DIVIDE:
-                if (work.num2 == 0) {
-                    InvalidOperation io = new InvalidOperation();
-                    io.whatOp = work.op.getValue();
-                    io.why = "Cannot divide by 0";
-                    resultHandler.onError(io);
-                }
-                val = work.num1 / work.num2;
-                break;
-            default:
-                InvalidOperation io = new InvalidOperation();
-                io.whatOp = work.op.getValue();
-                io.why = "Unknown operation";
-                resultHandler.onError(io);
-            }
-            resultHandler.onComplete(val);
-        }
-
-        @Override
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        public void zip(AsyncMethodCallback resultHandler) throws TException {
-            resultHandler.onComplete(new Object());
-        }
-
-        @Override
-        public void echo(Work w, AsyncMethodCallback<Work> resultHandler) throws TException {
-            resultHandler.onComplete(w.deepCopy());
-        }
-
-        @Override
-        public void alltypes(boolean v1, byte v2, short v3, int v4, long v5, double v6, String v7, ByteBuffer v8, Work v9, List<Integer> v10, Set<String> v11,
-                             Map<String, Long> v12, AsyncMethodCallback<Integer> resultHandler)
-            throws TException {
-            resultHandler.onComplete(new Integer(1));
-        }
-    }
 }