You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/11 09:44:11 UTC
[camel] 01/02: CAMEL-14052: camel-paho make it possible to
configure all its options via compoent options that also works for Spring
Boot
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 65885bee95091034b3625690f2ef1bebf649089a
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Oct 11 10:52:03 2019 +0200
CAMEL-14052: camel-paho make it possible to configure all its options via compoent options that also works for Spring Boot
---
.../apache/camel/component/paho/PahoComponent.java | 91 +---
.../camel/component/paho/PahoConfiguration.java | 581 +++++++++++++++++++++
.../apache/camel/component/paho/PahoConsumer.java | 4 +-
.../apache/camel/component/paho/PahoEndpoint.java | 264 +++-------
.../apache/camel/component/paho/PahoProducer.java | 4 +-
.../camel/component/paho/PahoComponentTest.java | 34 +-
.../component/paho/PahoOverrideTopicTest.java | 5 -
7 files changed, 675 insertions(+), 308 deletions(-)
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
index d045bbc..30f43ce 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java
@@ -23,7 +23,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttClient;
/**
* Component to integrate with the Eclipse Paho MQTT library.
@@ -31,15 +31,11 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@Component("paho")
public class PahoComponent extends DefaultComponent {
- private String brokerUrl;
- private String clientId;
- @Metadata(label = "security", secret = true)
- private String userName;
- @Metadata(label = "security", secret = true)
- private String password;
+ private PahoConfiguration configuration = new PahoConfiguration();
+
@Metadata(label = "advanced")
- private MqttConnectOptions connectOptions;
-
+ private MqttClient client;
+
public PahoComponent() {
this(null);
}
@@ -52,83 +48,48 @@ public class PahoComponent extends DefaultComponent {
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- PahoEndpoint answer = new PahoEndpoint(uri, remaining, this);
+ // Each endpoint can have its own configuration so make
+ // a copy of the configuration
+ PahoConfiguration configuration = getConfiguration().copy();
- if (brokerUrl != null) {
- answer.setBrokerUrl(brokerUrl);
- }
- if (clientId != null) {
- answer.setClientId(clientId);
- }
- if (userName != null) {
- answer.setUserName(userName);
- }
- if (password != null) {
- answer.setPassword(password);
- }
- if (connectOptions != null) {
- answer.setConnectOptions(connectOptions);
- }
+ PahoEndpoint answer = new PahoEndpoint(uri, remaining, this, configuration);
+ answer.setClient(client);
+ setProperties(configuration, parameters);
setProperties(answer, parameters);
return answer;
}
- // Getters and setters
-
- public String getBrokerUrl() {
- return brokerUrl;
+ public PahoConfiguration getConfiguration() {
+ return configuration;
}
/**
- * The URL of the MQTT broker.
+ * To use the shared Paho configuration
*/
- public void setBrokerUrl(String brokerUrl) {
- this.brokerUrl = brokerUrl;
+ public void setConfiguration(PahoConfiguration configuration) {
+ this.configuration = configuration;
}
- public String getClientId() {
- return clientId;
+ public MqttClient getClient() {
+ return client;
}
/**
- * MQTT client identifier.
+ * To use a shared Paho client
*/
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
-
- public String getUserName() {
- return userName;
+ public void setClient(MqttClient client) {
+ this.client = client;
}
- /**
- * Username to be used for authentication against the MQTT broker
- */
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public String getPassword() {
- return password;
- }
-
- /**
- * Password to be used for authentication against the MQTT broker
- */
- public void setPassword(String password) {
- this.password = password;
- }
-
- public MqttConnectOptions getConnectOptions() {
- return connectOptions;
+ public String getBrokerUrl() {
+ return configuration.getBrokerUrl();
}
/**
- * Client connection options
+ * The URL of the MQTT broker.
*/
- public void setConnectOptions(MqttConnectOptions connectOptions) {
- this.connectOptions = connectOptions;
+ public void setBrokerUrl(String brokerUrl) {
+ configuration.setBrokerUrl(brokerUrl);
}
-
}
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConfiguration.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConfiguration.java
new file mode 100644
index 0000000..96eb341
--- /dev/null
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConfiguration.java
@@ -0,0 +1,581 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho;
+
+import java.util.Properties;
+import javax.net.SocketFactory;
+import javax.net.ssl.HostnameVerifier;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+@UriParams
+public class PahoConfiguration implements Cloneable {
+
+ @UriParam
+ private String clientId = "camel-" + System.nanoTime();
+ @UriParam(defaultValue = PahoConstants.DEFAULT_BROKER_URL)
+ private String brokerUrl = PahoConstants.DEFAULT_BROKER_URL;
+ @UriParam(defaultValue = "2")
+ private int qos = PahoConstants.DEFAULT_QOS;
+ @UriParam
+ private boolean retained;
+ @UriParam(defaultValue = "MEMORY")
+ private PahoPersistence persistence = PahoPersistence.MEMORY;
+ @UriParam
+ private String filePersistenceDirectory;
+
+ @UriParam(defaultValue = "60")
+ private int keepAliveInterval = 60;
+ @UriParam(defaultValue = "10")
+ private int maxInflight = 10;
+ @UriParam
+ private String willTopic;
+ @UriParam
+ private String willPayload;
+ @UriParam
+ private int willQos;
+ @UriParam
+ private boolean willRetained;
+ @UriParam(label = "security") @Metadata(secret = true)
+ private String userName;
+ @UriParam(label = "security") @Metadata(secret = true)
+ private String password;
+ @UriParam(label = "security")
+ private SocketFactory socketFactory;
+ @UriParam(label = "security")
+ private Properties sslClientProps;
+ @UriParam(label = "security", defaultValue = "true")
+ private boolean httpsHostnameVerificationEnabled = true;
+ @UriParam(label = "security")
+ private HostnameVerifier sslHostnameVerifier;
+ @UriParam(defaultValue = "true")
+ private boolean cleanSession = true;
+ @UriParam(defaultValue = "30")
+ private int connectionTimeout = 30;
+ @UriParam
+ private String serverURIs;
+ @UriParam
+ private int mqttVersion;
+ @UriParam(defaultValue = "true")
+ private boolean automaticReconnect = true;
+ @UriParam(defaultValue = "128000")
+ private int maxReconnectDelay = 128000;
+ @UriParam(label = "advanced")
+ private Properties customWebSocketHeaders;
+ @UriParam(label = "advanced", defaultValue = "1")
+ private int executorServiceTimeout = 1;
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ /**
+ * MQTT client identifier.
+ */
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getBrokerUrl() {
+ return brokerUrl;
+ }
+
+ /**
+ * The URL of the MQTT broker.
+ */
+ public void setBrokerUrl(String brokerUrl) {
+ this.brokerUrl = brokerUrl;
+ }
+
+ public int getQos() {
+ return qos;
+ }
+
+ /**
+ * Client quality of service level (0-2).
+ */
+ public void setQos(int qos) {
+ this.qos = qos;
+ }
+
+ public boolean isRetained() {
+ return retained;
+ }
+
+ /**
+ * Retain option
+ */
+ public void setRetained(boolean retained) {
+ this.retained = retained;
+ }
+
+ public PahoPersistence getPersistence() {
+ return persistence;
+ }
+
+ /**
+ * Client persistence to be used - memory or file.
+ */
+ public void setPersistence(PahoPersistence persistence) {
+ this.persistence = persistence;
+ }
+
+ public String getFilePersistenceDirectory() {
+ return filePersistenceDirectory;
+ }
+
+ /**
+ * Base directory used by file persistence. Will by default use user directory.
+ */
+ public void setFilePersistenceDirectory(String filePersistenceDirectory) {
+ this.filePersistenceDirectory = filePersistenceDirectory;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ /**
+ * Username to be used for authentication against the MQTT broker
+ */
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Password to be used for authentication against the MQTT broker
+ */
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public int getKeepAliveInterval() {
+ return keepAliveInterval;
+ }
+
+ /**
+ * Sets the keep alive interval. This value, measured in seconds, defines the
+ * maximum time interval between messages sent or received. It enables the
+ * client to detect if the server is no longer available, without having to wait
+ * for the TCP/IP timeout. The client will ensure that at least one message
+ * travels across the network within each keep alive period. In the absence of a
+ * data-related message during the time period, the client sends a very small
+ * ping message, which the server will acknowledge. A value of 0 disables
+ * keepalive processing in the client.
+ * <p>
+ * The default value is 60 seconds
+ * </p>
+ */
+ public void setKeepAliveInterval(int keepAliveInterval) {
+ this.keepAliveInterval = keepAliveInterval;
+ }
+
+ public int getMaxInflight() {
+ return maxInflight;
+ }
+
+ /**
+ * Sets the max inflight. please increase this value in a high traffic
+ * environment.
+ * <p>
+ * The default value is 10
+ * </p>
+ */
+ public void setMaxInflight(int maxInflight) {
+ this.maxInflight = maxInflight;
+ }
+
+ public String getWillTopic() {
+ return willTopic;
+ }
+
+ /**
+ * Sets the "Last Will and Testament" (LWT) for the connection. In the event
+ * that this client unexpectedly loses its connection to the server, the server
+ * will publish a message to itself using the supplied details.
+ *
+ * The topic to publish to
+ * The byte payload for the message.
+ * The quality of service to publish the message at (0, 1 or 2).
+ * Whether or not the message should be retained.
+ */
+ public void setWillTopic(String willTopic) {
+ this.willTopic = willTopic;
+ }
+
+ public String getWillPayload() {
+ return willPayload;
+ }
+
+ /**
+ * Sets the "Last Will and Testament" (LWT) for the connection. In the event
+ * that this client unexpectedly loses its connection to the server, the server
+ * will publish a message to itself using the supplied details.
+ *
+ * The topic to publish to
+ * The byte payload for the message.
+ * The quality of service to publish the message at (0, 1 or 2).
+ * Whether or not the message should be retained.
+ */
+ public void setWillPayload(String willPayload) {
+ this.willPayload = willPayload;
+ }
+
+ public int getWillQos() {
+ return willQos;
+ }
+
+ /**
+ * Sets the "Last Will and Testament" (LWT) for the connection. In the event
+ * that this client unexpectedly loses its connection to the server, the server
+ * will publish a message to itself using the supplied details.
+ *
+ * The topic to publish to
+ * The byte payload for the message.
+ * The quality of service to publish the message at (0, 1 or 2).
+ * Whether or not the message should be retained.
+ */
+ public void setWillQos(int willQos) {
+ this.willQos = willQos;
+ }
+
+ public boolean isWillRetained() {
+ return willRetained;
+ }
+
+ /**
+ * Sets the "Last Will and Testament" (LWT) for the connection. In the event
+ * that this client unexpectedly loses its connection to the server, the server
+ * will publish a message to itself using the supplied details.
+ *
+ * The topic to publish to
+ * The byte payload for the message.
+ * The quality of service to publish the message at (0, 1 or 2).
+ * Whether or not the message should be retained.
+ */
+ public void setWillRetained(boolean willRetained) {
+ this.willRetained = willRetained;
+ }
+
+ public SocketFactory getSocketFactory() {
+ return socketFactory;
+ }
+
+ /**
+ * Sets the SocketFactory to use. This allows an application to
+ * apply its own policies around the creation of network sockets. If using an
+ * SSL connection, an SSLSocketFactory can be used to supply
+ * application-specific security settings.
+ */
+ public void setSocketFactory(SocketFactory socketFactory) {
+ this.socketFactory = socketFactory;
+ }
+
+ public Properties getSslClientProps() {
+ return sslClientProps;
+ }
+
+ /**
+ * Sets the SSL properties for the connection.
+ * <p>
+ * Note that these properties are only valid if an implementation of the Java
+ * Secure Socket Extensions (JSSE) is available. These properties are
+ * <em>not</em> used if a custom SocketFactory has been set.
+ *
+ * The following properties can be used:
+ * </p>
+ * <dl>
+ * <dt>com.ibm.ssl.protocol</dt>
+ * <dd>One of: SSL, SSLv3, TLS, TLSv1, SSL_TLS.</dd>
+ * <dt>com.ibm.ssl.contextProvider
+ * <dd>Underlying JSSE provider. For example "IBMJSSE2" or "SunJSSE"</dd>
+ *
+ * <dt>com.ibm.ssl.keyStore</dt>
+ * <dd>The name of the file that contains the KeyStore object that you want the
+ * KeyManager to use. For example /mydir/etc/key.p12</dd>
+ *
+ * <dt>com.ibm.ssl.keyStorePassword</dt>
+ * <dd>The password for the KeyStore object that you want the KeyManager to use.
+ * The password can either be in plain-text, or may be obfuscated using the
+ * static method:
+ * <code>com.ibm.micro.security.Password.obfuscate(char[] password)</code>. This
+ * obfuscates the password using a simple and insecure XOR and Base64 encoding
+ * mechanism. Note that this is only a simple scrambler to obfuscate clear-text
+ * passwords.</dd>
+ *
+ * <dt>com.ibm.ssl.keyStoreType</dt>
+ * <dd>Type of key store, for example "PKCS12", "JKS", or "JCEKS".</dd>
+ *
+ * <dt>com.ibm.ssl.keyStoreProvider</dt>
+ * <dd>Key store provider, for example "IBMJCE" or "IBMJCEFIPS".</dd>
+ *
+ * <dt>com.ibm.ssl.trustStore</dt>
+ * <dd>The name of the file that contains the KeyStore object that you want the
+ * TrustManager to use.</dd>
+ *
+ * <dt>com.ibm.ssl.trustStorePassword</dt>
+ * <dd>The password for the TrustStore object that you want the TrustManager to
+ * use. The password can either be in plain-text, or may be obfuscated using the
+ * static method:
+ * <code>com.ibm.micro.security.Password.obfuscate(char[] password)</code>. This
+ * obfuscates the password using a simple and insecure XOR and Base64 encoding
+ * mechanism. Note that this is only a simple scrambler to obfuscate clear-text
+ * passwords.</dd>
+ *
+ * <dt>com.ibm.ssl.trustStoreType</dt>
+ * <dd>The type of KeyStore object that you want the default TrustManager to
+ * use. Same possible values as "keyStoreType".</dd>
+ *
+ * <dt>com.ibm.ssl.trustStoreProvider</dt>
+ * <dd>Trust store provider, for example "IBMJCE" or "IBMJCEFIPS".</dd>
+ *
+ * <dt>com.ibm.ssl.enabledCipherSuites</dt>
+ * <dd>A list of which ciphers are enabled. Values are dependent on the
+ * provider, for example:
+ * SSL_RSA_WITH_AES_128_CBC_SHA;SSL_RSA_WITH_3DES_EDE_CBC_SHA.</dd>
+ *
+ * <dt>com.ibm.ssl.keyManager</dt>
+ * <dd>Sets the algorithm that will be used to instantiate a KeyManagerFactory
+ * object instead of using the default algorithm available in the platform.
+ * Example values: "IbmX509" or "IBMJ9X509".</dd>
+ *
+ * <dt>com.ibm.ssl.trustManager</dt>
+ * <dd>Sets the algorithm that will be used to instantiate a TrustManagerFactory
+ * object instead of using the default algorithm available in the platform.
+ * Example values: "PKIX" or "IBMJ9X509".</dd>
+ * </dl>
+ */
+ public void setSslClientProps(Properties sslClientProps) {
+ this.sslClientProps = sslClientProps;
+ }
+
+ public boolean isHttpsHostnameVerificationEnabled() {
+ return httpsHostnameVerificationEnabled;
+ }
+
+ /**
+ * Whether SSL HostnameVerifier is enabled or not.
+ * The default value is true.
+ */
+ public void setHttpsHostnameVerificationEnabled(boolean httpsHostnameVerificationEnabled) {
+ this.httpsHostnameVerificationEnabled = httpsHostnameVerificationEnabled;
+ }
+
+ public HostnameVerifier getSslHostnameVerifier() {
+ return sslHostnameVerifier;
+ }
+
+ /**
+ * Sets the HostnameVerifier for the SSL connection. Note that it will be used
+ * after handshake on a connection and you should do actions by yourself when
+ * hostname is verified error.
+ * <p>
+ * There is no default HostnameVerifier
+ * </p>
+ */
+ public void setSslHostnameVerifier(HostnameVerifier sslHostnameVerifier) {
+ this.sslHostnameVerifier = sslHostnameVerifier;
+ }
+
+ public boolean isCleanSession() {
+ return cleanSession;
+ }
+
+ /**
+ * Sets whether the client and server should remember state across restarts and
+ * reconnects.
+ * <ul>
+ * <li>If set to false both the client and server will maintain state across
+ * restarts of the client, the server and the connection. As state is
+ * maintained:
+ * <ul>
+ * <li>Message delivery will be reliable meeting the specified QOS even if the
+ * client, server or connection are restarted.
+ * <li>The server will treat a subscription as durable.
+ * </ul>
+ * <li>If set to true the client and server will not maintain state across
+ * restarts of the client, the server or the connection. This means
+ * <ul>
+ * <li>Message delivery to the specified QOS cannot be maintained if the client,
+ * server or connection are restarted
+ * <li>The server will treat a subscription as non-durable
+ * </ul>
+ * </ul>
+ */
+ public void setCleanSession(boolean cleanSession) {
+ this.cleanSession = cleanSession;
+ }
+
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ /**
+ * Sets the connection timeout value. This value, measured in seconds, defines
+ * the maximum time interval the client will wait for the network connection to
+ * the MQTT server to be established. The default timeout is 30 seconds. A value
+ * of 0 disables timeout processing meaning the client will wait until the
+ * network connection is made successfully or fails.
+ */
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ public String getServerURIs() {
+ return serverURIs;
+ }
+
+ /**
+ * Set a list of one or more serverURIs the client may connect to.
+ * Multiple servers can be separated by comma.
+ * <p>
+ * Each <code>serverURI</code> specifies the address of a server that the client
+ * may connect to. Two types of connection are supported <code>tcp://</code> for
+ * a TCP connection and <code>ssl://</code> for a TCP connection secured by
+ * SSL/TLS. For example:
+ * <ul>
+ * <li><code>tcp://localhost:1883</code></li>
+ * <li><code>ssl://localhost:8883</code></li>
+ * </ul>
+ * If the port is not specified, it will default to 1883 for
+ * <code>tcp://</code>" URIs, and 8883 for <code>ssl://</code> URIs.
+ * <p>
+ * If serverURIs is set then it overrides the serverURI parameter passed in on
+ * the constructor of the MQTT client.
+ * <p>
+ * When an attempt to connect is initiated the client will start with the first
+ * serverURI in the list and work through the list until a connection is
+ * established with a server. If a connection cannot be made to any of the
+ * servers then the connect attempt fails.
+ * <p>
+ * Specifying a list of servers that a client may connect to has several uses:
+ * <ol>
+ * <li>High Availability and reliable message delivery
+ * <p>
+ * Some MQTT servers support a high availability feature where two or more
+ * "equal" MQTT servers share state. An MQTT client can connect to any of the
+ * "equal" servers and be assured that messages are reliably delivered and
+ * durable subscriptions are maintained no matter which server the client
+ * connects to.
+ * </p>
+ * <p>
+ * The cleansession flag must be set to false if durable subscriptions and/or
+ * reliable message delivery is required.
+ * </p>
+ * </li>
+ * <li>Hunt List
+ * <p>
+ * A set of servers may be specified that are not "equal" (as in the high
+ * availability option). As no state is shared across the servers reliable
+ * message delivery and durable subscriptions are not valid. The cleansession
+ * flag must be set to true if the hunt list mode is used
+ * </p>
+ * </li>
+ * </ol>
+ */
+ public void setServerURIs(String serverURIs) {
+ this.serverURIs = serverURIs;
+ }
+
+ public int getMqttVersion() {
+ return mqttVersion;
+ }
+
+ /**
+ * Sets the MQTT version. The default action is to connect with version 3.1.1,
+ * and to fall back to 3.1 if that fails. Version 3.1.1 or 3.1 can be selected
+ * specifically, with no fall back, by using the MQTT_VERSION_3_1_1 or
+ * MQTT_VERSION_3_1 options respectively.
+ */
+ public void setMqttVersion(int mqttVersion) {
+ this.mqttVersion = mqttVersion;
+ }
+
+ public boolean isAutomaticReconnect() {
+ return automaticReconnect;
+ }
+
+ /**
+ * Sets whether the client will automatically attempt to reconnect to the server
+ * if the connection is lost.
+ * <ul>
+ * <li>If set to false, the client will not attempt to automatically reconnect
+ * to the server in the event that the connection is lost.</li>
+ * <li>If set to true, in the event that the connection is lost, the client will
+ * attempt to reconnect to the server. It will initially wait 1 second before it
+ * attempts to reconnect, for every failed reconnect attempt, the delay will
+ * double until it is at 2 minutes at which point the delay will stay at 2
+ * minutes.</li>
+ * </ul>
+ */
+ public void setAutomaticReconnect(boolean automaticReconnect) {
+ this.automaticReconnect = automaticReconnect;
+ }
+
+ public int getMaxReconnectDelay() {
+ return maxReconnectDelay;
+ }
+
+ /**
+ * Get the maximum time (in millis) to wait between reconnects
+ */
+ public void setMaxReconnectDelay(int maxReconnectDelay) {
+ this.maxReconnectDelay = maxReconnectDelay;
+ }
+
+ public Properties getCustomWebSocketHeaders() {
+ return customWebSocketHeaders;
+ }
+
+ /**
+ * Sets the Custom WebSocket Headers for the WebSocket Connection.
+ */
+ public void setCustomWebSocketHeaders(Properties customWebSocketHeaders) {
+ this.customWebSocketHeaders = customWebSocketHeaders;
+ }
+
+ public int getExecutorServiceTimeout() {
+ return executorServiceTimeout;
+ }
+
+ /**
+ * Set the time in seconds that the executor service should wait when
+ * terminating before forcefully terminating. It is not recommended to change
+ * this value unless you are absolutely sure that you need to.
+ */
+ public void setExecutorServiceTimeout(int executorServiceTimeout) {
+ this.executorServiceTimeout = executorServiceTimeout;
+ }
+
+ public PahoConfiguration copy() {
+ try {
+ return (PahoConfiguration) clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+
+
+}
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 6825b3a..0fb419a 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -36,14 +36,14 @@ public class PahoConsumer extends DefaultConsumer {
protected void doStart() throws Exception {
super.doStart();
String topic = getEndpoint().getTopic();
- getEndpoint().getClient().subscribe(topic, getEndpoint().getQos());
+ getEndpoint().getClient().subscribe(topic, getEndpoint().getConfiguration().getQos());
getEndpoint().getClient().setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (reconnect) {
try {
- getEndpoint().getClient().subscribe(topic, getEndpoint().getQos());
+ getEndpoint().getClient().subscribe(topic, getEndpoint().getConfiguration().getQos());
} catch (MqttException e) {
log.error("MQTT resubscribe failed {}", e.getMessage(), e);
}
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 905e234..f84b425 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -16,9 +16,6 @@
*/
package org.apache.camel.component.paho;
-import java.util.Set;
-
-import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -37,64 +34,78 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
/**
- * Component for communicating with MQTT M2M message brokers using Eclipse Paho MQTT Client.
+ * Component for communicating with MQTT message brokers using Eclipse Paho MQTT Client.
*/
@UriEndpoint(firstVersion = "2.16.0", scheme = "paho", title = "Paho", label = "messaging,iot", syntax = "paho:topic")
public class PahoEndpoint extends DefaultEndpoint {
// Configuration members
- @UriPath
+ @UriPath(description = "Name of the topic")
@Metadata(required = true)
- private String topic;
- @UriParam
- private String clientId = "camel-" + System.nanoTime();
- @UriParam(defaultValue = PahoConstants.DEFAULT_BROKER_URL)
- private String brokerUrl = PahoConstants.DEFAULT_BROKER_URL;
- @UriParam(defaultValue = "2")
- private int qos = PahoConstants.DEFAULT_QOS;
+ private final String topic;
@UriParam
- private boolean retained;
- @UriParam(defaultValue = "MEMORY")
- private PahoPersistence persistence = PahoPersistence.MEMORY;
- @UriParam
- private String filePersistenceDirectory;
- @UriParam(defaultValue = "true")
- private boolean autoReconnect = true;
- @UriParam(label = "security") @Metadata(secret = true)
- private String userName;
- @UriParam(label = "security") @Metadata(secret = true)
- private String password;
- @UriParam(label = "advanced", defaultValue = "true")
- private boolean resolveMqttConnectOptions = true;
-
- // Collaboration members
- @UriParam
- private MqttConnectOptions connectOptions;
-
- // Auto-configuration members
-
- private transient MqttClient client;
+ private final PahoConfiguration configuration;
+ @UriParam(label = "advanced")
+ private MqttClient client;
+ private transient boolean stopClient;
- public PahoEndpoint(String uri, String topic, Component component) {
+ public PahoEndpoint(String uri, String topic, PahoComponent component, PahoConfiguration configuration) {
super(uri, component);
this.topic = topic;
+ this.configuration = configuration;
}
@Override
protected void doStart() throws Exception {
super.doStart();
- client = new MqttClient(getBrokerUrl(), getClientId(), resolvePersistence());
- client.connect(resolveMqttConnectOptions());
+
+ if (client == null) {
+ stopClient = true;
+ client = new MqttClient(configuration.getBrokerUrl(), configuration.getClientId(), resolvePersistence());
+ client.connect(createMqttConnectOptions(configuration));
+ }
}
@Override
protected void doStop() throws Exception {
- if (getClient().isConnected()) {
- getClient().disconnect();
+ if (stopClient && client.isConnected()) {
+ client.disconnect();
}
+
super.doStop();
}
+ private static MqttConnectOptions createMqttConnectOptions(PahoConfiguration config) {
+ MqttConnectOptions mq = new MqttConnectOptions();
+ if (ObjectHelper.isNotEmpty(config.getUserName()) && ObjectHelper.isNotEmpty(config.getPassword())) {
+ mq.setUserName(config.getUserName());
+ mq.setPassword(config.getPassword().toCharArray());
+ }
+ mq.setAutomaticReconnect(config.isAutomaticReconnect());
+ mq.setCleanSession(config.isCleanSession());
+ mq.setConnectionTimeout(config.getConnectionTimeout());
+ mq.setExecutorServiceTimeout(config.getExecutorServiceTimeout());
+ mq.setCustomWebSocketHeaders(config.getCustomWebSocketHeaders());
+ mq.setHttpsHostnameVerificationEnabled(config.isHttpsHostnameVerificationEnabled());
+ mq.setKeepAliveInterval(config.getKeepAliveInterval());
+ mq.setMaxInflight(config.getMaxInflight());
+ mq.setMaxReconnectDelay(config.getMaxReconnectDelay());
+ mq.setMqttVersion(config.getMqttVersion());
+ mq.setSocketFactory(config.getSocketFactory());
+ mq.setSSLHostnameVerifier(config.getSslHostnameVerifier());
+ mq.setSSLProperties(config.getSslClientProps());
+ if (config.getWillTopic() != null && config.getWillPayload() != null) {
+ mq.setWill(config.getWillTopic(),
+ config.getWillPayload().getBytes(),
+ config.getWillQos(),
+ config.isWillRetained());
+ }
+ if (config.getServerURIs() != null) {
+ mq.setServerURIs(config.getServerURIs().split(","));
+ }
+ return mq;
+ }
+
@Override
public Producer createProducer() throws Exception {
return new PahoProducer(this);
@@ -107,48 +118,26 @@ public class PahoEndpoint extends DefaultEndpoint {
@Override
public PahoComponent getComponent() {
- return (PahoComponent)super.getComponent();
+ return (PahoComponent) super.getComponent();
+ }
+
+ public String getTopic() {
+ return topic;
}
// Resolvers
protected MqttClientPersistence resolvePersistence() {
- if (persistence == PahoPersistence.MEMORY) {
+ if (configuration.getPersistence() == PahoPersistence.MEMORY) {
return new MemoryPersistence();
} else {
- if (filePersistenceDirectory != null) {
- return new MqttDefaultFilePersistence(filePersistenceDirectory);
+ if (configuration.getFilePersistenceDirectory() != null) {
+ return new MqttDefaultFilePersistence(configuration.getFilePersistenceDirectory());
} else {
return new MqttDefaultFilePersistence();
}
}
}
- protected MqttConnectOptions resolveMqttConnectOptions() {
- if (connectOptions != null) {
- return connectOptions;
- }
-
- if (resolveMqttConnectOptions) {
- Set<MqttConnectOptions> connectOptions = getCamelContext().getRegistry().findByType(MqttConnectOptions.class);
- if (connectOptions.size() == 1) {
- log.info("Single MqttConnectOptions instance found in the registry. It will be used by the endpoint.");
- return connectOptions.iterator().next();
- } else if (connectOptions.size() > 1) {
- log.warn("Found {} instances of the MqttConnectOptions in the registry. None of these will be used by the endpoint. "
- + "Please use 'connectOptions' endpoint option to select one.", connectOptions.size());
- }
- }
-
- MqttConnectOptions options = new MqttConnectOptions();
- options.setAutomaticReconnect(autoReconnect);
-
- if (ObjectHelper.isNotEmpty(userName) && ObjectHelper.isNotEmpty(password)) {
- options.setUserName(userName);
- options.setPassword(password.toCharArray());
- }
- return options;
- }
-
public Exchange createExchange(MqttMessage mqttMessage, String topic) {
Exchange exchange = createExchange();
@@ -161,85 +150,8 @@ public class PahoEndpoint extends DefaultEndpoint {
return exchange;
}
- // Configuration getters & setters
-
- public String getClientId() {
- return clientId;
- }
-
- /**
- * MQTT client identifier.
- */
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
-
- public String getBrokerUrl() {
- return brokerUrl;
- }
-
- /**
- * The URL of the MQTT broker.
- */
- public void setBrokerUrl(String brokerUrl) {
- this.brokerUrl = brokerUrl;
- }
-
- public String getTopic() {
- return topic;
- }
-
- /**
- * Name of the topic
- */
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public int getQos() {
- return qos;
- }
-
- /**
- * Client quality of service level (0-2).
- */
- public void setQos(int qos) {
- this.qos = qos;
- }
-
- public boolean isRetained() {
- return retained;
- }
-
- /**
- * Retain option
- */
- public void setRetained(boolean retained) {
- this.retained = retained;
- }
-
- // Auto-configuration getters & setters
-
- public PahoPersistence getPersistence() {
- return persistence;
- }
-
- /**
- * Client persistence to be used - memory or file.
- */
- public void setPersistence(PahoPersistence persistence) {
- this.persistence = persistence;
- }
-
- public String getFilePersistenceDirectory() {
- return filePersistenceDirectory;
- }
-
- /**
- * Base directory used by file persistence. Will by default use user directory.
- */
- public void setFilePersistenceDirectory(String filePersistenceDirectory) {
- this.filePersistenceDirectory = filePersistenceDirectory;
+ public PahoConfiguration getConfiguration() {
+ return configuration;
}
public MqttClient getClient() {
@@ -247,65 +159,9 @@ public class PahoEndpoint extends DefaultEndpoint {
}
/**
- * To use the existing MqttClient instance as client.
+ * To use an exiting mqtt client
*/
public void setClient(MqttClient client) {
this.client = client;
}
-
- public MqttConnectOptions getConnectOptions() {
- return connectOptions;
- }
-
- /**
- * Client connection options
- */
- public void setConnectOptions(MqttConnectOptions connOpts) {
- this.connectOptions = connOpts;
- }
-
- public synchronized boolean isAutoReconnect() {
- return autoReconnect;
- }
-
- /**
- * Client will automatically attempt to reconnect to the server if the connection is lost
- */
- public synchronized void setAutoReconnect(boolean autoReconnect) {
- this.autoReconnect = autoReconnect;
- }
-
- public String getUserName() {
- return userName;
- }
-
- /**
- * Username to be used for authentication against the MQTT broker
- */
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public String getPassword() {
- return password;
- }
-
- /**
- * Password to be used for authentication against the MQTT broker
- */
- public void setPassword(String password) {
- this.password = password;
- }
-
- public synchronized boolean isResolveMqttConnectOptions() {
- return resolveMqttConnectOptions;
- }
-
- /**
- * Define if you don't want to resolve the MQTT Connect Options from registry
- */
- public synchronized void setResolveMqttConnectOptions(boolean resolveMqttConnectOptions) {
- this.resolveMqttConnectOptions = resolveMqttConnectOptions;
- }
-
}
diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
index 29e1854..08428a5 100644
--- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
+++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
@@ -32,8 +32,8 @@ public class PahoProducer extends DefaultProducer {
MqttClient client = getEndpoint().getClient();
String topic = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_OVERRIDE_TOPIC, getEndpoint().getTopic(), String.class);
- int qos = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS, getEndpoint().getQos(), Integer.class);
- boolean retained = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED, getEndpoint().isRetained(), Boolean.class);
+ int qos = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS, getEndpoint().getConfiguration().getQos(), Integer.class);
+ boolean retained = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED, getEndpoint().getConfiguration().isRetained(), Boolean.class);
byte[] payload = exchange.getIn().getBody(byte[].class);
MqttMessage message = new MqttMessage(payload);
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
index 33ae40d..9a48988 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java
@@ -19,23 +19,18 @@ package org.apache.camel.component.paho;
import java.io.UnsupportedEncodingException;
import org.apache.activemq.broker.BrokerService;
-import org.apache.camel.BindToRegistry;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit4.CamelTestSupport;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.After;
import org.junit.Test;
public class PahoComponentTest extends CamelTestSupport {
- @BindToRegistry("connectOptions")
- MqttConnectOptions connectOptions = new MqttConnectOptions();
-
@EndpointInject("mock:test")
MockEndpoint mock;
@@ -82,8 +77,6 @@ public class PahoComponentTest extends CamelTestSupport {
from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest");
- from("direct:connectOptions").to("paho:registryConnectOptions?connectOptions=#connectOptions&brokerUrl=tcp://localhost:" + mqttPort);
-
from("direct:testCustomizedPaho").to("customizedPaho:testCustomizedPaho?brokerUrl=tcp://localhost:" + mqttPort);
from("paho:testCustomizedPaho?brokerUrl=tcp://localhost:" + mqttPort).to("mock:testCustomizedPaho");
}
@@ -100,10 +93,10 @@ public class PahoComponentTest extends CamelTestSupport {
// Then
assertEquals("/test/topic", endpoint.getTopic());
- assertEquals("sampleClient", endpoint.getClientId());
- assertEquals("tcp://localhost:" + mqttPort, endpoint.getBrokerUrl());
- assertEquals(2, endpoint.getQos());
- assertEquals(PahoPersistence.FILE, endpoint.getPersistence());
+ assertEquals("sampleClient", endpoint.getConfiguration().getClientId());
+ assertEquals("tcp://localhost:" + mqttPort, endpoint.getConfiguration().getBrokerUrl());
+ assertEquals(2, endpoint.getConfiguration().getQos());
+ assertEquals(PahoPersistence.FILE, endpoint.getConfiguration().getPersistence());
}
@Test
@@ -132,25 +125,6 @@ public class PahoComponentTest extends CamelTestSupport {
}
@Test
- public void shouldUseConnectionOptionsFromRegistry() {
- // Given
- PahoEndpoint pahoWithConnectOptionsFromRegistry = getMandatoryEndpoint("paho:registryConnectOptions?connectOptions=#connectOptions&brokerUrl=tcp://localhost:" + mqttPort,
- PahoEndpoint.class);
-
- // Then
- assertSame(connectOptions, pahoWithConnectOptionsFromRegistry.resolveMqttConnectOptions());
- }
-
- @Test
- public void shouldAutomaticallyUseConnectionOptionsFromRegistry() {
- // Given
- PahoEndpoint pahoWithConnectOptionsFromRegistry = getMandatoryEndpoint("paho:registryConnectOptions?brokerUrl=tcp://localhost:" + mqttPort, PahoEndpoint.class);
-
- // Then
- assertSame(connectOptions, pahoWithConnectOptionsFromRegistry.resolveMqttConnectOptions());
- }
-
- @Test
public void shouldKeepDefaultMessageInHeader() throws InterruptedException, UnsupportedEncodingException {
// Given
final String msg = "msg";
diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java
index 9182566..ef751a4 100644
--- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java
+++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoOverrideTopicTest.java
@@ -17,19 +17,14 @@
package org.apache.camel.component.paho;
import org.apache.activemq.broker.BrokerService;
-import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit4.CamelTestSupport;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.After;
import org.junit.Test;
public class PahoOverrideTopicTest extends CamelTestSupport {
- @BindToRegistry("connectOptions")
- MqttConnectOptions connectOptions = new MqttConnectOptions();
-
BrokerService broker;
int mqttPort = AvailablePortFinder.getNextAvailable();