You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/11 09:44:11 UTC

[camel] 01/02: CAMEL-14052: camel-paho make it possible to configure all its options via compoent options that also works for Spring Boot

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 65885bee95091034b3625690f2ef1bebf649089a
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Oct 11 10:52:03 2019 +0200

    CAMEL-14052: camel-paho make it possible to configure all its options via compoent options that also works for Spring Boot
---
 .../apache/camel/component/paho/PahoComponent.java |  91 +---
 .../camel/component/paho/PahoConfiguration.java    | 581 +++++++++++++++++++++
 .../apache/camel/component/paho/PahoConsumer.java  |   4 +-
 .../apache/camel/component/paho/PahoEndpoint.java  | 264 +++-------
 .../apache/camel/component/paho/PahoProducer.java  |   4 +-
 .../camel/component/paho/PahoComponentTest.java    |  34 +-
 .../component/paho/PahoOverrideTopicTest.java      |   5 -
 7 files changed, 675 insertions(+), 308 deletions(-)

diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
index d045bbc..30f43ce 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
@@ -23,7 +23,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttClient;
 
 /**
  * Component to integrate with the Eclipse Paho MQTT library.
@@ -31,15 +31,11 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 @Component("paho")
 public class PahoComponent extends DefaultComponent {
 
-    private String brokerUrl;
-    private String clientId;
-    @Metadata(label = "security", secret = true)
-    private String userName;
-    @Metadata(label = "security", secret = true)
-    private String password;
+    private PahoConfiguration configuration = new PahoConfiguration();
+
     @Metadata(label = "advanced")
-    private MqttConnectOptions connectOptions;
-    
+    private MqttClient client;
+
     public PahoComponent() {
         this(null);
     }
@@ -52,83 +48,48 @@ public class PahoComponent extends DefaultComponent {
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        PahoEndpoint answer = new PahoEndpoint(uri, remaining, this);
+        // Each endpoint can have its own configuration so make
+        // a copy of the configuration
+        PahoConfiguration configuration = getConfiguration().copy();
 
-        if (brokerUrl != null) {
-            answer.setBrokerUrl(brokerUrl);
-        }
-        if (clientId != null) {
-            answer.setClientId(clientId);
-        }
-        if (userName != null) {
-            answer.setUserName(userName);
-        }
-        if (password != null) {
-            answer.setPassword(password);
-        }
-        if (connectOptions != null) {
-            answer.setConnectOptions(connectOptions);
-        }
+        PahoEndpoint answer = new PahoEndpoint(uri, remaining, this, configuration);
+        answer.setClient(client);
 
+        setProperties(configuration, parameters);
         setProperties(answer, parameters);
         return answer;
     }
 
-    // Getters and setters
-
-    public String getBrokerUrl() {
-        return brokerUrl;
+    public PahoConfiguration getConfiguration() {
+        return configuration;
     }
 
     /**
-     * The URL of the MQTT broker.
+     * To use the shared Paho configuration
      */
-    public void setBrokerUrl(String brokerUrl) {
-        this.brokerUrl = brokerUrl;
+    public void setConfiguration(PahoConfiguration configuration) {
+        this.configuration = configuration;
     }
 
-    public String getClientId() {
-        return clientId;
+    public MqttClient getClient() {
+        return client;
     }
 
     /**
-     * MQTT client identifier.
+     * To use a shared Paho client
      */
-    public void setClientId(String clientId) {
-        this.clientId = clientId;
-    }
-
-    public String getUserName() {
-        return userName;
+    public void setClient(MqttClient client) {
+        this.client = client;
     }
 
-    /**
-     * Username to be used for authentication against the MQTT broker
-     */
-    public void setUserName(String userName) {
-        this.userName = userName;
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    /**
-     * Password to be used for authentication against the MQTT broker
-     */
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    public MqttConnectOptions getConnectOptions() {
-        return connectOptions;
+    public String getBrokerUrl() {
+        return configuration.getBrokerUrl();
     }
 
     /**
-     * Client connection options
+     * The URL of the MQTT broker.
      */
-    public void setConnectOptions(MqttConnectOptions connectOptions) {
-        this.connectOptions = connectOptions;
+    public void setBrokerUrl(String brokerUrl) {
+        configuration.setBrokerUrl(brokerUrl);
     }
-    
 }
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConfiguration.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConfiguration.java
new file mode 100644
index 0000000..96eb341
--- /dev/null
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConfiguration.java
@@ -0,0 +1,581 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho;
+
+import java.util.Properties;
+import javax.net.SocketFactory;
+import javax.net.ssl.HostnameVerifier;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+@UriParams
+public class PahoConfiguration implements Cloneable {
+
+    @UriParam
+    private String clientId = "camel-" + System.nanoTime();
+    @UriParam(defaultValue = PahoConstants.DEFAULT_BROKER_URL)
+    private String brokerUrl = PahoConstants.DEFAULT_BROKER_URL;
+    @UriParam(defaultValue = "2")
+    private int qos = PahoConstants.DEFAULT_QOS;
+    @UriParam
+    private boolean retained;
+    @UriParam(defaultValue = "MEMORY")
+    private PahoPersistence persistence = PahoPersistence.MEMORY;
+    @UriParam
+    private String filePersistenceDirectory;
+
+    @UriParam(defaultValue = "60")
+    private int keepAliveInterval = 60;
+    @UriParam(defaultValue = "10")
+    private int maxInflight = 10;
+    @UriParam
+    private String willTopic;
+    @UriParam
+    private String willPayload;
+    @UriParam
+    private int willQos;
+    @UriParam
+    private boolean willRetained;
+    @UriParam(label = "security") @Metadata(secret = true)
+    private String userName;
+    @UriParam(label = "security") @Metadata(secret = true)
+    private String password;
+    @UriParam(label = "security")
+    private SocketFactory socketFactory;
+    @UriParam(label = "security")
+    private Properties sslClientProps;
+    @UriParam(label = "security", defaultValue = "true")
+    private boolean httpsHostnameVerificationEnabled = true;
+    @UriParam(label = "security")
+    private HostnameVerifier sslHostnameVerifier;
+    @UriParam(defaultValue = "true")
+    private boolean cleanSession = true;
+    @UriParam(defaultValue = "30")
+    private int connectionTimeout = 30;
+    @UriParam
+    private String serverURIs;
+    @UriParam
+    private int mqttVersion;
+    @UriParam(defaultValue = "true")
+    private boolean automaticReconnect = true;
+    @UriParam(defaultValue = "128000")
+    private int maxReconnectDelay = 128000;
+    @UriParam(label = "advanced")
+    private Properties customWebSocketHeaders;
+    @UriParam(label = "advanced", defaultValue = "1")
+    private int executorServiceTimeout = 1;
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    /**
+     * MQTT client identifier.
+     */
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    /**
+     * The URL of the MQTT broker.
+     */
+    public void setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+    }
+
+    public int getQos() {
+        return qos;
+    }
+
+    /**
+     * Client quality of service level (0-2).
+     */
+    public void setQos(int qos) {
+        this.qos = qos;
+    }
+
+    public boolean isRetained() {
+        return retained;
+    }
+
+    /**
+     * Retain option
+     */
+    public void setRetained(boolean retained) {
+        this.retained = retained;
+    }
+
+    public PahoPersistence getPersistence() {
+        return persistence;
+    }
+
+    /**
+     * Client persistence to be used - memory or file.
+     */
+    public void setPersistence(PahoPersistence persistence) {
+        this.persistence = persistence;
+    }
+
+    public String getFilePersistenceDirectory() {
+        return filePersistenceDirectory;
+    }
+
+    /**
+     * Base directory used by file persistence. Will by default use user directory.
+     */
+    public void setFilePersistenceDirectory(String filePersistenceDirectory) {
+        this.filePersistenceDirectory = filePersistenceDirectory;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    /**
+     * Username to be used for authentication against the MQTT broker
+     */
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * Password to be used for authentication against the MQTT broker
+     */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public int getKeepAliveInterval() {
+        return keepAliveInterval;
+    }
+
+    /**
+     * Sets the keep alive interval. This value, measured in seconds, defines the
+     * maximum time interval between messages sent or received. It enables the
+     * client to detect if the server is no longer available, without having to wait
+     * for the TCP/IP timeout. The client will ensure that at least one message
+     * travels across the network within each keep alive period. In the absence of a
+     * data-related message during the time period, the client sends a very small
+     * ping message, which the server will acknowledge. A value of 0 disables
+     * keepalive processing in the client.
+     * <p>
+     * The default value is 60 seconds
+     * </p>
+     */
+    public void setKeepAliveInterval(int keepAliveInterval) {
+        this.keepAliveInterval = keepAliveInterval;
+    }
+
+    public int getMaxInflight() {
+        return maxInflight;
+    }
+
+    /**
+     * Sets the max inflight. please increase this value in a high traffic
+     * environment.
+     * <p>
+     * The default value is 10
+     * </p>
+     */
+    public void setMaxInflight(int maxInflight) {
+        this.maxInflight = maxInflight;
+    }
+
+    public String getWillTopic() {
+        return willTopic;
+    }
+
+    /**
+     * Sets the "Last Will and Testament" (LWT) for the connection. In the event
+     * that this client unexpectedly loses its connection to the server, the server
+     * will publish a message to itself using the supplied details.
+     *
+     * The topic to publish to
+     * The byte payload for the message.
+     * The quality of service to publish the message at (0, 1 or 2).
+     * Whether or not the message should be retained.
+     */
+    public void setWillTopic(String willTopic) {
+        this.willTopic = willTopic;
+    }
+
+    public String getWillPayload() {
+        return willPayload;
+    }
+
+    /**
+     * Sets the "Last Will and Testament" (LWT) for the connection. In the event
+     * that this client unexpectedly loses its connection to the server, the server
+     * will publish a message to itself using the supplied details.
+     *
+     * The topic to publish to
+     * The byte payload for the message.
+     * The quality of service to publish the message at (0, 1 or 2).
+     * Whether or not the message should be retained.
+     */
+    public void setWillPayload(String willPayload) {
+        this.willPayload = willPayload;
+    }
+
+    public int getWillQos() {
+        return willQos;
+    }
+
+    /**
+     * Sets the "Last Will and Testament" (LWT) for the connection. In the event
+     * that this client unexpectedly loses its connection to the server, the server
+     * will publish a message to itself using the supplied details.
+     *
+     * The topic to publish to
+     * The byte payload for the message.
+     * The quality of service to publish the message at (0, 1 or 2).
+     * Whether or not the message should be retained.
+     */
+    public void setWillQos(int willQos) {
+        this.willQos = willQos;
+    }
+
+    public boolean isWillRetained() {
+        return willRetained;
+    }
+
+    /**
+     * Sets the "Last Will and Testament" (LWT) for the connection. In the event
+     * that this client unexpectedly loses its connection to the server, the server
+     * will publish a message to itself using the supplied details.
+     *
+     * The topic to publish to
+     * The byte payload for the message.
+     * The quality of service to publish the message at (0, 1 or 2).
+     * Whether or not the message should be retained.
+     */
+    public void setWillRetained(boolean willRetained) {
+        this.willRetained = willRetained;
+    }
+
+    public SocketFactory getSocketFactory() {
+        return socketFactory;
+    }
+
+    /**
+     * Sets the SocketFactory to use. This allows an application to
+     * apply its own policies around the creation of network sockets. If using an
+     * SSL connection, an SSLSocketFactory can be used to supply
+     * application-specific security settings.
+     */
+    public void setSocketFactory(SocketFactory socketFactory) {
+        this.socketFactory = socketFactory;
+    }
+
+    public Properties getSslClientProps() {
+        return sslClientProps;
+    }
+
+    /**
+     * Sets the SSL properties for the connection.
+     * <p>
+     * Note that these properties are only valid if an implementation of the Java
+     * Secure Socket Extensions (JSSE) is available. These properties are
+     * <em>not</em> used if a custom SocketFactory has been set.
+     *
+     * The following properties can be used:
+     * </p>
+     * <dl>
+     * <dt>com.ibm.ssl.protocol</dt>
+     * <dd>One of: SSL, SSLv3, TLS, TLSv1, SSL_TLS.</dd>
+     * <dt>com.ibm.ssl.contextProvider
+     * <dd>Underlying JSSE provider. For example "IBMJSSE2" or "SunJSSE"</dd>
+     *
+     * <dt>com.ibm.ssl.keyStore</dt>
+     * <dd>The name of the file that contains the KeyStore object that you want the
+     * KeyManager to use. For example /mydir/etc/key.p12</dd>
+     *
+     * <dt>com.ibm.ssl.keyStorePassword</dt>
+     * <dd>The password for the KeyStore object that you want the KeyManager to use.
+     * The password can either be in plain-text, or may be obfuscated using the
+     * static method:
+     * <code>com.ibm.micro.security.Password.obfuscate(char[] password)</code>. This
+     * obfuscates the password using a simple and insecure XOR and Base64 encoding
+     * mechanism. Note that this is only a simple scrambler to obfuscate clear-text
+     * passwords.</dd>
+     *
+     * <dt>com.ibm.ssl.keyStoreType</dt>
+     * <dd>Type of key store, for example "PKCS12", "JKS", or "JCEKS".</dd>
+     *
+     * <dt>com.ibm.ssl.keyStoreProvider</dt>
+     * <dd>Key store provider, for example "IBMJCE" or "IBMJCEFIPS".</dd>
+     *
+     * <dt>com.ibm.ssl.trustStore</dt>
+     * <dd>The name of the file that contains the KeyStore object that you want the
+     * TrustManager to use.</dd>
+     *
+     * <dt>com.ibm.ssl.trustStorePassword</dt>
+     * <dd>The password for the TrustStore object that you want the TrustManager to
+     * use. The password can either be in plain-text, or may be obfuscated using the
+     * static method:
+     * <code>com.ibm.micro.security.Password.obfuscate(char[] password)</code>. This
+     * obfuscates the password using a simple and insecure XOR and Base64 encoding
+     * mechanism. Note that this is only a simple scrambler to obfuscate clear-text
+     * passwords.</dd>
+     *
+     * <dt>com.ibm.ssl.trustStoreType</dt>
+     * <dd>The type of KeyStore object that you want the default TrustManager to
+     * use. Same possible values as "keyStoreType".</dd>
+     *
+     * <dt>com.ibm.ssl.trustStoreProvider</dt>
+     * <dd>Trust store provider, for example "IBMJCE" or "IBMJCEFIPS".</dd>
+     *
+     * <dt>com.ibm.ssl.enabledCipherSuites</dt>
+     * <dd>A list of which ciphers are enabled. Values are dependent on the
+     * provider, for example:
+     * SSL_RSA_WITH_AES_128_CBC_SHA;SSL_RSA_WITH_3DES_EDE_CBC_SHA.</dd>
+     *
+     * <dt>com.ibm.ssl.keyManager</dt>
+     * <dd>Sets the algorithm that will be used to instantiate a KeyManagerFactory
+     * object instead of using the default algorithm available in the platform.
+     * Example values: "IbmX509" or "IBMJ9X509".</dd>
+     *
+     * <dt>com.ibm.ssl.trustManager</dt>
+     * <dd>Sets the algorithm that will be used to instantiate a TrustManagerFactory
+     * object instead of using the default algorithm available in the platform.
+     * Example values: "PKIX" or "IBMJ9X509".</dd>
+     * </dl>
+     */
+    public void setSslClientProps(Properties sslClientProps) {
+        this.sslClientProps = sslClientProps;
+    }
+
+    public boolean isHttpsHostnameVerificationEnabled() {
+        return httpsHostnameVerificationEnabled;
+    }
+
+    /**
+     * Whether SSL HostnameVerifier is enabled or not.
+     * The default value is true.
+     */
+    public void setHttpsHostnameVerificationEnabled(boolean httpsHostnameVerificationEnabled) {
+        this.httpsHostnameVerificationEnabled = httpsHostnameVerificationEnabled;
+    }
+
+    public HostnameVerifier getSslHostnameVerifier() {
+        return sslHostnameVerifier;
+    }
+
+    /**
+     * Sets the HostnameVerifier for the SSL connection. Note that it will be used
+     * after handshake on a connection and you should do actions by yourself when
+     * hostname is verified error.
+     * <p>
+     * There is no default HostnameVerifier
+     * </p>
+     */
+    public void setSslHostnameVerifier(HostnameVerifier sslHostnameVerifier) {
+        this.sslHostnameVerifier = sslHostnameVerifier;
+    }
+
+    public boolean isCleanSession() {
+        return cleanSession;
+    }
+
+    /**
+     * Sets whether the client and server should remember state across restarts and
+     * reconnects.
+     * <ul>
+     * <li>If set to false both the client and server will maintain state across
+     * restarts of the client, the server and the connection. As state is
+     * maintained:
+     * <ul>
+     * <li>Message delivery will be reliable meeting the specified QOS even if the
+     * client, server or connection are restarted.
+     * <li>The server will treat a subscription as durable.
+     * </ul>
+     * <li>If set to true the client and server will not maintain state across
+     * restarts of the client, the server or the connection. This means
+     * <ul>
+     * <li>Message delivery to the specified QOS cannot be maintained if the client,
+     * server or connection are restarted
+     * <li>The server will treat a subscription as non-durable
+     * </ul>
+     * </ul>
+     */
+    public void setCleanSession(boolean cleanSession) {
+        this.cleanSession = cleanSession;
+    }
+
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    /**
+     * Sets the connection timeout value. This value, measured in seconds, defines
+     * the maximum time interval the client will wait for the network connection to
+     * the MQTT server to be established. The default timeout is 30 seconds. A value
+     * of 0 disables timeout processing meaning the client will wait until the
+     * network connection is made successfully or fails.
+     */
+    public void setConnectionTimeout(int connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    public String getServerURIs() {
+        return serverURIs;
+    }
+
+    /**
+     * Set a list of one or more serverURIs the client may connect to.
+     * Multiple servers can be separated by comma.
+     * <p>
+     * Each <code>serverURI</code> specifies the address of a server that the client
+     * may connect to. Two types of connection are supported <code>tcp://</code> for
+     * a TCP connection and <code>ssl://</code> for a TCP connection secured by
+     * SSL/TLS. For example:
+     * <ul>
+     * <li><code>tcp://localhost:1883</code></li>
+     * <li><code>ssl://localhost:8883</code></li>
+     * </ul>
+     * If the port is not specified, it will default to 1883 for
+     * <code>tcp://</code>" URIs, and 8883 for <code>ssl://</code> URIs.
+     * <p>
+     * If serverURIs is set then it overrides the serverURI parameter passed in on
+     * the constructor of the MQTT client.
+     * <p>
+     * When an attempt to connect is initiated the client will start with the first
+     * serverURI in the list and work through the list until a connection is
+     * established with a server. If a connection cannot be made to any of the
+     * servers then the connect attempt fails.
+     * <p>
+     * Specifying a list of servers that a client may connect to has several uses:
+     * <ol>
+     * <li>High Availability and reliable message delivery
+     * <p>
+     * Some MQTT servers support a high availability feature where two or more
+     * "equal" MQTT servers share state. An MQTT client can connect to any of the
+     * "equal" servers and be assured that messages are reliably delivered and
+     * durable subscriptions are maintained no matter which server the client
+     * connects to.
+     * </p>
+     * <p>
+     * The cleansession flag must be set to false if durable subscriptions and/or
+     * reliable message delivery is required.
+     * </p>
+     * </li>
+     * <li>Hunt List
+     * <p>
+     * A set of servers may be specified that are not "equal" (as in the high
+     * availability option). As no state is shared across the servers reliable
+     * message delivery and durable subscriptions are not valid. The cleansession
+     * flag must be set to true if the hunt list mode is used
+     * </p>
+     * </li>
+     * </ol>
+     */
+    public void setServerURIs(String serverURIs) {
+        this.serverURIs = serverURIs;
+    }
+
+    public int getMqttVersion() {
+        return mqttVersion;
+    }
+
+    /**
+     * Sets the MQTT version. The default action is to connect with version 3.1.1,
+     * and to fall back to 3.1 if that fails. Version 3.1.1 or 3.1 can be selected
+     * specifically, with no fall back, by using the MQTT_VERSION_3_1_1 or
+     * MQTT_VERSION_3_1 options respectively.
+     */
+    public void setMqttVersion(int mqttVersion) {
+        this.mqttVersion = mqttVersion;
+    }
+
+    public boolean isAutomaticReconnect() {
+        return automaticReconnect;
+    }
+
+    /**
+     * Sets whether the client will automatically attempt to reconnect to the server
+     * if the connection is lost.
+     * <ul>
+     * <li>If set to false, the client will not attempt to automatically reconnect
+     * to the server in the event that the connection is lost.</li>
+     * <li>If set to true, in the event that the connection is lost, the client will
+     * attempt to reconnect to the server. It will initially wait 1 second before it
+     * attempts to reconnect, for every failed reconnect attempt, the delay will
+     * double until it is at 2 minutes at which point the delay will stay at 2
+     * minutes.</li>
+     * </ul>
+     */
+    public void setAutomaticReconnect(boolean automaticReconnect) {
+        this.automaticReconnect = automaticReconnect;
+    }
+
+    public int getMaxReconnectDelay() {
+        return maxReconnectDelay;
+    }
+
+    /**
+     * Get the maximum time (in millis) to wait between reconnects
+     */
+    public void setMaxReconnectDelay(int maxReconnectDelay) {
+        this.maxReconnectDelay = maxReconnectDelay;
+    }
+
+    public Properties getCustomWebSocketHeaders() {
+        return customWebSocketHeaders;
+    }
+
+    /**
+     * Sets the Custom WebSocket Headers for the WebSocket Connection.
+     */
+    public void setCustomWebSocketHeaders(Properties customWebSocketHeaders) {
+        this.customWebSocketHeaders = customWebSocketHeaders;
+    }
+
+    public int getExecutorServiceTimeout() {
+        return executorServiceTimeout;
+    }
+
+    /**
+     * Set the time in seconds that the executor service should wait when
+     * terminating before forcefully terminating. It is not recommended to change
+     * this value unless you are absolutely sure that you need to.
+     */
+    public void setExecutorServiceTimeout(int executorServiceTimeout) {
+        this.executorServiceTimeout = executorServiceTimeout;
+    }
+
+    public PahoConfiguration copy() {
+        try {
+            return (PahoConfiguration) clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+
+
+}
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 6825b3a..0fb419a 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -36,14 +36,14 @@ public class PahoConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
         String topic = getEndpoint().getTopic();
-        getEndpoint().getClient().subscribe(topic, getEndpoint().getQos());
+        getEndpoint().getClient().subscribe(topic, getEndpoint().getConfiguration().getQos());
         getEndpoint().getClient().setCallback(new MqttCallbackExtended() {
 
             @Override
             public void connectComplete(boolean reconnect, String serverURI) {
                 if (reconnect) {
                     try {
-                        getEndpoint().getClient().subscribe(topic, getEndpoint().getQos());
+                        getEndpoint().getClient().subscribe(topic, getEndpoint().getConfiguration().getQos());
                     } catch (MqttException e) {
                         log.error("MQTT resubscribe failed {}", e.getMessage(), e);
                     }
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 905e234..f84b425 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.component.paho;
 
-import java.util.Set;
-
-import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -37,64 +34,78 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
 
 /**
- * Component for communicating with MQTT M2M message brokers using Eclipse Paho MQTT Client.
+ * Component for communicating with MQTT message brokers using Eclipse Paho MQTT Client.
  */
 @UriEndpoint(firstVersion = "2.16.0", scheme = "paho", title = "Paho", label = "messaging,iot", syntax = "paho:topic")
 public class PahoEndpoint extends DefaultEndpoint {
 
     // Configuration members
-    @UriPath
+    @UriPath(description = "Name of the topic")
     @Metadata(required = true)
-    private String topic;
-    @UriParam
-    private String clientId = "camel-" + System.nanoTime();
-    @UriParam(defaultValue = PahoConstants.DEFAULT_BROKER_URL)
-    private String brokerUrl = PahoConstants.DEFAULT_BROKER_URL;
-    @UriParam(defaultValue = "2")
-    private int qos = PahoConstants.DEFAULT_QOS;
+    private final String topic;
     @UriParam
-    private boolean retained;
-    @UriParam(defaultValue = "MEMORY")
-    private PahoPersistence persistence = PahoPersistence.MEMORY;
-    @UriParam
-    private String filePersistenceDirectory;
-    @UriParam(defaultValue = "true")
-    private boolean autoReconnect = true;
-    @UriParam(label = "security") @Metadata(secret = true)
-    private String userName;
-    @UriParam(label = "security") @Metadata(secret = true)
-    private String password;
-    @UriParam(label = "advanced", defaultValue = "true")
-    private boolean resolveMqttConnectOptions = true;
-
-    // Collaboration members
-    @UriParam
-    private MqttConnectOptions connectOptions;
-
-    // Auto-configuration members
-
-    private transient MqttClient client;
+    private final PahoConfiguration configuration;
+    @UriParam(label = "advanced")
+    private MqttClient client;
+    private transient boolean stopClient;
 
-    public PahoEndpoint(String uri, String topic, Component component) {
+    public PahoEndpoint(String uri, String topic, PahoComponent component, PahoConfiguration configuration) {
         super(uri, component);
         this.topic = topic;
+        this.configuration = configuration;
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        client = new MqttClient(getBrokerUrl(), getClientId(), resolvePersistence());
-        client.connect(resolveMqttConnectOptions());
+
+        if (client == null) {
+            stopClient = true;
+            client = new MqttClient(configuration.getBrokerUrl(), configuration.getClientId(), resolvePersistence());
+            client.connect(createMqttConnectOptions(configuration));
+        }
     }
 
     @Override
     protected void doStop() throws Exception {
-        if (getClient().isConnected()) {
-            getClient().disconnect();
+        if (stopClient && client.isConnected()) {
+            client.disconnect();
         }
+
         super.doStop();
     }
 
+    private static MqttConnectOptions createMqttConnectOptions(PahoConfiguration config) {
+        MqttConnectOptions mq = new MqttConnectOptions();
+        if (ObjectHelper.isNotEmpty(config.getUserName()) && ObjectHelper.isNotEmpty(config.getPassword())) {
+            mq.setUserName(config.getUserName());
+            mq.setPassword(config.getPassword().toCharArray());
+        }
+        mq.setAutomaticReconnect(config.isAutomaticReconnect());
+        mq.setCleanSession(config.isCleanSession());
+        mq.setConnectionTimeout(config.getConnectionTimeout());
+        mq.setExecutorServiceTimeout(config.getExecutorServiceTimeout());
+        mq.setCustomWebSocketHeaders(config.getCustomWebSocketHeaders());
+        mq.setHttpsHostnameVerificationEnabled(config.isHttpsHostnameVerificationEnabled());
+        mq.setKeepAliveInterval(config.getKeepAliveInterval());
+        mq.setMaxInflight(config.getMaxInflight());
+        mq.setMaxReconnectDelay(config.getMaxReconnectDelay());
+        mq.setMqttVersion(config.getMqttVersion());
+        mq.setSocketFactory(config.getSocketFactory());
+        mq.setSSLHostnameVerifier(config.getSslHostnameVerifier());
+        mq.setSSLProperties(config.getSslClientProps());
+        if (config.getWillTopic() != null && config.getWillPayload() != null) {
+            mq.setWill(config.getWillTopic(),
+                    config.getWillPayload().getBytes(),
+                    config.getWillQos(),
+                    config.isWillRetained());
+        }
+        if (config.getServerURIs() != null) {
+            mq.setServerURIs(config.getServerURIs().split(","));
+        }
+        return mq;
+    }
+
     @Override
     public Producer createProducer() throws Exception {
         return new PahoProducer(this);
@@ -107,48 +118,26 @@ public class PahoEndpoint extends DefaultEndpoint {
 
     @Override
     public PahoComponent getComponent() {
-        return (PahoComponent)super.getComponent();
+        return (PahoComponent) super.getComponent();
+    }
+
+    public String getTopic() {
+        return topic;
     }
 
     // Resolvers
     protected MqttClientPersistence resolvePersistence() {
-        if (persistence == PahoPersistence.MEMORY) {
+        if (configuration.getPersistence() == PahoPersistence.MEMORY) {
             return new MemoryPersistence();
         } else {
-            if (filePersistenceDirectory != null) {
-                return new MqttDefaultFilePersistence(filePersistenceDirectory);
+            if (configuration.getFilePersistenceDirectory() != null) {
+                return new MqttDefaultFilePersistence(configuration.getFilePersistenceDirectory());
             } else {
                 return new MqttDefaultFilePersistence();
             }
         }
     }
 
-    protected MqttConnectOptions resolveMqttConnectOptions() {
-        if (connectOptions != null) {
-            return connectOptions;
-        }
-
-        if (resolveMqttConnectOptions) {
-            Set<MqttConnectOptions> connectOptions = getCamelContext().getRegistry().findByType(MqttConnectOptions.class);
-            if (connectOptions.size() == 1) {
-                log.info("Single MqttConnectOptions instance found in the registry. It will be used by the endpoint.");
-                return connectOptions.iterator().next();
-            } else if (connectOptions.size() > 1) {
-                log.warn("Found {} instances of the MqttConnectOptions in the registry. None of these will be used by the endpoint. "
-                         + "Please use 'connectOptions' endpoint option to select one.", connectOptions.size());
-            }
-        }
-
-        MqttConnectOptions options = new MqttConnectOptions();
-        options.setAutomaticReconnect(autoReconnect);
-
-        if (ObjectHelper.isNotEmpty(userName) && ObjectHelper.isNotEmpty(password)) {
-            options.setUserName(userName);
-            options.setPassword(password.toCharArray());
-        }
-        return options;
-    }
-
     public Exchange createExchange(MqttMessage mqttMessage, String topic) {
         Exchange exchange = createExchange();
 
@@ -161,85 +150,8 @@ public class PahoEndpoint extends DefaultEndpoint {
         return exchange;
     }
 
-    // Configuration getters & setters
-
-    public String getClientId() {
-        return clientId;
-    }
-
-    /**
-     * MQTT client identifier.
-     */
-    public void setClientId(String clientId) {
-        this.clientId = clientId;
-    }
-
-    public String getBrokerUrl() {
-        return brokerUrl;
-    }
-
-    /**
-     * The URL of the MQTT broker.
-     */
-    public void setBrokerUrl(String brokerUrl) {
-        this.brokerUrl = brokerUrl;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    /**
-     * Name of the topic
-     */
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public int getQos() {
-        return qos;
-    }
-
-    /**
-     * Client quality of service level (0-2).
-     */
-    public void setQos(int qos) {
-        this.qos = qos;
-    }
-
-    public boolean isRetained() {
-        return retained;
-    }
-
-    /**
-     * Retain option
-     */
-    public void setRetained(boolean retained) {
-        this.retained = retained;
-    }
-
-    // Auto-configuration getters & setters
-
-    public PahoPersistence getPersistence() {
-        return persistence;
-    }
-
-    /**
-     * Client persistence to be used - memory or file.
-     */
-    public void setPersistence(PahoPersistence persistence) {
-        this.persistence = persistence;
-    }
-
-    public String getFilePersistenceDirectory() {
-        return filePersistenceDirectory;
-    }
-
-    /**
-     * Base directory used by file persistence. Will by default use user directory.
-     */
-    public void setFilePersistenceDirectory(String filePersistenceDirectory) {
-        this.filePersistenceDirectory = filePersistenceDirectory;
+    public PahoConfiguration getConfiguration() {
+        return configuration;
     }
 
     public MqttClient getClient() {
@@ -247,65 +159,9 @@ public class PahoEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * To use the existing MqttClient instance as client.
+     * To use an exiting mqtt client
      */
     public void setClient(MqttClient client) {
         this.client = client;
     }
-
-    public MqttConnectOptions getConnectOptions() {
-        return connectOptions;
-    }
-
-    /**
-     * Client connection options
-     */
-    public void setConnectOptions(MqttConnectOptions connOpts) {
-        this.connectOptions = connOpts;
-    }
-
-    public synchronized boolean isAutoReconnect() {
-        return autoReconnect;
-    }
-
-    /**
-     * Client will automatically attempt to reconnect to the server if the connection is lost
-     */
-    public synchronized void setAutoReconnect(boolean autoReconnect) {
-        this.autoReconnect = autoReconnect;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    /**
-     * Username to be used for authentication against the MQTT broker
-     */
-    public void setUserName(String userName) {
-        this.userName = userName;
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    /**
-     * Password to be used for authentication against the MQTT broker
-     */
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    public synchronized boolean isResolveMqttConnectOptions() {
-        return resolveMqttConnectOptions;
-    }
-
-    /**
-     * Define if you don't want to resolve the MQTT Connect Options from registry
-     */
-    public synchronized void setResolveMqttConnectOptions(boolean resolveMqttConnectOptions) {
-        this.resolveMqttConnectOptions = resolveMqttConnectOptions;
-    }
-
 }
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
index 29e1854..08428a5 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
@@ -32,8 +32,8 @@ public class PahoProducer extends DefaultProducer {
         MqttClient client = getEndpoint().getClient();
 
         String topic = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_OVERRIDE_TOPIC, getEndpoint().getTopic(), String.class);
-        int qos = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS, getEndpoint().getQos(), Integer.class);
-        boolean retained = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED, getEndpoint().isRetained(), Boolean.class);
+        int qos = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS, getEndpoint().getConfiguration().getQos(), Integer.class);
+        boolean retained = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED, getEndpoint().getConfiguration().isRetained(), Boolean.class);
         byte[] payload = exchange.getIn().getBody(byte[].class);
 
         MqttMessage message = new MqttMessage(payload);
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
index 33ae40d..9a48988 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
@@ -19,23 +19,18 @@ package org.apache.camel.component.paho;
 import java.io.UnsupportedEncodingException;
 
 import org.apache.activemq.broker.BrokerService;
-import org.apache.camel.BindToRegistry;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit4.CamelTestSupport;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.junit.After;
 import org.junit.Test;
 
 public class PahoComponentTest extends CamelTestSupport {
 
-    @BindToRegistry("connectOptions")
-    MqttConnectOptions connectOptions = new MqttConnectOptions();
-
     @EndpointInject("mock:test")
     MockEndpoint mock;
 
@@ -82,8 +77,6 @@ public class PahoComponentTest extends CamelTestSupport {
 
                 from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest");
 
-                from("direct:connectOptions").to("paho:registryConnectOptions?connectOptions=#connectOptions&brokerUrl=tcp://localhost:" + mqttPort);
-
                 from("direct:testCustomizedPaho").to("customizedPaho:testCustomizedPaho?brokerUrl=tcp://localhost:" + mqttPort);
                 from("paho:testCustomizedPaho?brokerUrl=tcp://localhost:" + mqttPort).to("mock:testCustomizedPaho");
             }
@@ -100,10 +93,10 @@ public class PahoComponentTest extends CamelTestSupport {
 
         // Then
         assertEquals("/test/topic", endpoint.getTopic());
-        assertEquals("sampleClient", endpoint.getClientId());
-        assertEquals("tcp://localhost:" + mqttPort, endpoint.getBrokerUrl());
-        assertEquals(2, endpoint.getQos());
-        assertEquals(PahoPersistence.FILE, endpoint.getPersistence());
+        assertEquals("sampleClient", endpoint.getConfiguration().getClientId());
+        assertEquals("tcp://localhost:" + mqttPort, endpoint.getConfiguration().getBrokerUrl());
+        assertEquals(2, endpoint.getConfiguration().getQos());
+        assertEquals(PahoPersistence.FILE, endpoint.getConfiguration().getPersistence());
     }
 
     @Test
@@ -132,25 +125,6 @@ public class PahoComponentTest extends CamelTestSupport {
     }
 
     @Test
-    public void shouldUseConnectionOptionsFromRegistry() {
-        // Given
-        PahoEndpoint pahoWithConnectOptionsFromRegistry = getMandatoryEndpoint("paho:registryConnectOptions?connectOptions=#connectOptions&brokerUrl=tcp://localhost:" + mqttPort,
-                                                                               PahoEndpoint.class);
-
-        // Then
-        assertSame(connectOptions, pahoWithConnectOptionsFromRegistry.resolveMqttConnectOptions());
-    }
-
-    @Test
-    public void shouldAutomaticallyUseConnectionOptionsFromRegistry() {
-        // Given
-        PahoEndpoint pahoWithConnectOptionsFromRegistry = getMandatoryEndpoint("paho:registryConnectOptions?brokerUrl=tcp://localhost:" + mqttPort, PahoEndpoint.class);
-
-        // Then
-        assertSame(connectOptions, pahoWithConnectOptionsFromRegistry.resolveMqttConnectOptions());
-    }
-
-    @Test
     public void shouldKeepDefaultMessageInHeader() throws InterruptedException, UnsupportedEncodingException {
         // Given
         final String msg = "msg";
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java
index 9182566..ef751a4 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java
@@ -17,19 +17,14 @@
 package org.apache.camel.component.paho;
 
 import org.apache.activemq.broker.BrokerService;
-import org.apache.camel.BindToRegistry;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.test.junit4.CamelTestSupport;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.junit.After;
 import org.junit.Test;
 
 public class PahoOverrideTopicTest extends CamelTestSupport {
 
-    @BindToRegistry("connectOptions")
-    MqttConnectOptions connectOptions = new MqttConnectOptions();
-
     BrokerService broker;
 
     int mqttPort = AvailablePortFinder.getNextAvailable();