You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/11/18 07:56:13 UTC

[nifi] branch master updated: NIFI-5929 Support for IBM MQ multi-instance queue managers

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dfbc97  NIFI-5929 Support for IBM MQ multi-instance queue managers
1dfbc97 is described below

commit 1dfbc97c074a5fc5c8e68124e0984504cfa97813
Author: Veli Kerim Celik <ke...@skat.dk>
AuthorDate: Fri Jan 4 17:10:00 2019 +0100

    NIFI-5929 Support for IBM MQ multi-instance queue managers
    
    proper line break
    
    more proper line break :)
    
    Link to external javadocs and add some code comments
    
    Test that properties is set on ConnectionFactory
    
    cleanup
    
    made two static properties optional and elaborated on 'Additional Details' page
    
    minor corrections to user doc
    
    open external links in new tab
    
    Do broker accommodations only if property is set. Add fallback accommodation.
    
    fix test
    
    Add support for colon pair(s) for IBM MQ. In fallback broker handling use first pair if multiple given.
    
    This closes #3246.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../nifi/jms/cf/JMSConnectionFactoryProvider.java  | 129 +++---
 .../additionalDetails.html                         | 199 +++++++---
 .../cf/JMSConnectionFactoryProviderForTest.java    |  49 +++
 .../jms/cf/JMSConnectionFactoryProviderTest.java   | 437 +++++++++++++++++++--
 4 files changed, 685 insertions(+), 129 deletions(-)

diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
index 781ce65..b831162 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
@@ -17,10 +17,12 @@
 package org.apache.nifi.jms.cf;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 import javax.jms.ConnectionFactory;
 import javax.net.ssl.SSLContext;
@@ -39,7 +41,6 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.SSLContextService.ClientAuth;
 import org.slf4j.Logger;
@@ -51,12 +52,10 @@ import org.slf4j.LoggerFactory;
  * <p>
  * It accomplishes it by adjusting current classpath by adding to it the
  * additional resources (i.e., JMS client libraries) provided by the user via
- * {@link JMSConnectionFactoryProviderDefinition#CLIENT_LIB_DIR_PATH}, allowing
- * it then to create an instance of the target {@link ConnectionFactory} based
- * on the provided
- * {@link JMSConnectionFactoryProviderDefinition#CONNECTION_FACTORY_IMPL} which
- * can be than access via {@link #getConnectionFactory()} method.
- * </p>
+ * {@link #CLIENT_LIB_DIR_PATH}, allowing it then to create an instance of the
+ * target {@link ConnectionFactory} based on the provided
+ * {@link #CONNECTION_FACTORY_IMPL} which can be than access via
+ * {@link #getConnectionFactory()} method.
  */
 @Tags({"jms", "messaging", "integration", "queue", "topic", "publish", "subscribe"})
 @CapabilityDescription("Provides a generic service to create vendor specific javax.jms.ConnectionFactory implementations. "
@@ -91,12 +90,12 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
             .build();
     public static final PropertyDescriptor CLIENT_LIB_DIR_PATH = new PropertyDescriptor.Builder()
             .name(CF_LIB)
-            .displayName("MQ Client Libraries path (i.e., /usr/jms/lib)")
+            .displayName("MQ Client Libraries path (i.e. /usr/jms/lib)")
             .description("Path to the directory with additional resources (i.e., JARs, configuration files etc.) to be added "
                     + "to the classpath. Such resources typically represent target MQ client libraries for the "
-                    + "ConnectionFactory implementation.")
+                    + "ConnectionFactory implementation. Required if target is not ActiveMQ.")
             .addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator()))
-            .required(true)
+            .required(false)
             .dynamicallyModifiesClasspath(true)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
@@ -105,10 +104,10 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
     public static final PropertyDescriptor BROKER_URI = new PropertyDescriptor.Builder()
             .name(BROKER)
             .displayName("Broker URI")
-            .description("URI pointing to the network location of the JMS Message broker. For example, "
-                    + "'tcp://myhost:61616' for ActiveMQ or 'myhost:1414' for IBM MQ")
+            .description("URI pointing to the network location of the JMS Message broker. Example for ActiveMQ: "
+                    + "'tcp://myhost:61616'. Examples for IBM MQ: 'myhost(1414)' and 'myhost01(1414),myhost02(1414)'")
             .addValidator(new NonEmptyBrokerURIValidator())
-            .required(true)
+            .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
@@ -160,7 +159,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
     }
 
     @OnEnabled
-    public void enable(ConfigurationContext context) throws InitializationException {
+    public void enable(ConfigurationContext context) {
         try {
             if (!this.configured) {
                 if (logger.isInfoEnabled()) {
@@ -192,55 +191,83 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
      * invoked to set the corresponding property to a value provided by during
      * service configuration. For example, 'channel' property will correspond to
      * 'setChannel(..) method and 'queueManager' property will correspond to
-     * setQueueManager(..) method with a single argument.
+     * setQueueManager(..) method with a single argument. The bean convention is also
+     * explained in user manual for this component with links pointing to
+     * documentation of various ConnectionFactories.
      * <p>
      * There are also few adjustments to accommodate well known brokers. For
      * example ActiveMQ ConnectionFactory accepts address of the Message Broker
-     * in a form of URL while IBMs in the form of host/port pair (more common).
-     * So this method will use value retrieved from the 'BROKER_URI' static
-     * property 'as is' if ConnectionFactory implementation is coming from
-     * ActiveMQ and for all others (for now) the 'BROKER_URI' value will be
-     * split on ':' and the resulting pair will be used to execute
-     * setHostName(..) and setPort(..) methods on the provided
-     * ConnectionFactory. This may need to be maintained and adjusted to
-     * accommodate other implementation of ConnectionFactory, but only for
-     * URL/Host/Port issue. All other properties are set as dynamic properties
-     * where user essentially provides both property name and value, The bean
-     * convention is also explained in user manual for this component with links
-     * pointing to documentation of various ConnectionFactories.
+     * in a form of URL while IBMs in the form of host/port pair(s).
+     * <p>
+     * This method will use the value retrieved from the 'BROKER_URI' static
+     * property as is. An exception to this if ConnectionFactory implementation
+     * is coming from IBM MQ and connecting to a stand-alone queue manager. In
+     * this case the Broker URI is expected to be entered as a colon separated
+     * host/port pair, which then is split on ':' and the resulting pair will be
+     * used to execute setHostName(..) and setPort(..) methods on the provided
+     * ConnectionFactory.
+     * <p>
+     * This method may need to be maintained and adjusted to accommodate other
+     * implementation of ConnectionFactory, but only for URL/Host/Port issue.
+     * All other properties are set as dynamic properties where user essentially
+     * provides both property name and value.
      *
-     * @see #setProperty(String, String) method
+     * @see <a href="http://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html#setBrokerURL-java.lang.String-">setBrokerURL(String brokerURL)</a>
+     * @see <a href="https://docs.tibco.com/pub/enterprise_message_service/8.1.0/doc/html/tib_ems_api_reference/api/javadoc/com/tibco/tibjms/TibjmsConnectionFactory.html#setServerUrl(java.lang.String)">setServerUrl(String serverUrl)</a>
+     * @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setHostName_java.lang.String_">setHostName(String hostname)</a>
+     * @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setPort_int_">setPort(int port)</a>
+     * @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setConnectionNameList_java.lang.String_">setConnectionNameList(String hosts)</a>
+     * @see #setProperty(String propertyName, Object propertyValue)
      */
-    private void setConnectionFactoryProperties(ConfigurationContext context) {
-        for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
-            PropertyDescriptor descriptor = entry.getKey();
-            String propertyName = descriptor.getName();
-            if (descriptor.isDynamic()) {
-                this.setProperty(propertyName, entry.getValue());
+    void setConnectionFactoryProperties(ConfigurationContext context) {
+        if (context.getProperty(BROKER_URI).isSet()) {
+            String brokerValue = context.getProperty(BROKER_URI).evaluateAttributeExpressions().getValue();
+            String connectionFactoryValue = context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
+            if (connectionFactoryValue.startsWith("org.apache.activemq")) {
+                this.setProperty("brokerURL", brokerValue);
+            } else if (connectionFactoryValue.startsWith("com.tibco.tibjms")) {
+                this.setProperty("serverUrl", brokerValue);
             } else {
-                if (propertyName.equals(BROKER)) {
-                    String brokerValue = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
-                    if (context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue().startsWith("org.apache.activemq")) {
-                        this.setProperty("brokerURL", brokerValue);
-                    } else {
-                        String[] hostPort = brokerValue.split(":");
+                String[] brokerList = brokerValue.split(",");
+                if (connectionFactoryValue.startsWith("com.ibm.mq.jms")) {
+                    List<String> ibmConList = new ArrayList<String>();
+                    for (String broker : brokerList) {
+                        String[] hostPort = broker.split(":");
                         if (hostPort.length == 2) {
-                            this.setProperty("hostName", hostPort[0]);
-                            this.setProperty("port", hostPort[1]);
-                        } else if (hostPort.length != 2) {
-                            this.setProperty("serverUrl", brokerValue); // for tibco
+                            ibmConList.add(hostPort[0]+"("+hostPort[1]+")");
                         } else {
-                            throw new IllegalArgumentException("Failed to parse broker url: " + brokerValue);
+                            ibmConList.add(broker);
                         }
                     }
-                    SSLContextService sc = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-                    if (sc != null) {
-                        SSLContext ssl = sc.createSSLContext(ClientAuth.NONE);
-                        this.setProperty("sSLSocketFactory", ssl.getSocketFactory());
+                    this.setProperty("connectionNameList", String.join(",", ibmConList));
+                } else {
+                    // Try to parse broker URI as colon separated host/port pair. Use first pair if multiple given.
+                    String[] hostPort = brokerList[0].split(":");
+                    if (hostPort.length == 2) {
+                        // If broker URI indeed was colon separated host/port pair
+                        this.setProperty("hostName", hostPort[0]);
+                        this.setProperty("port", hostPort[1]);
                     }
-                } // ignore 'else', since it's the only non-dynamic property that is relevant to CF configuration
+                }
             }
         }
+
+        SSLContextService sc = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sc != null) {
+            SSLContext ssl = sc.createSSLContext(ClientAuth.NONE);
+            this.setProperty("sSLSocketFactory", ssl.getSocketFactory());
+        }
+
+        List<Entry<PropertyDescriptor, String>> dynamicProperties = context.getProperties().entrySet().stream()
+                .filter(entry -> entry.getKey().isDynamic())
+                .collect(Collectors.toList());
+
+        for (Entry<PropertyDescriptor, String> entry : dynamicProperties) {
+            PropertyDescriptor descriptor = entry.getKey();
+            String propertyName = descriptor.getName();
+            String propertyValue = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
+            this.setProperty(propertyName, propertyValue);
+        }
     }
 
     /**
@@ -261,7 +288,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
      * follow bean convention and all their properties using Java primitives as
      * arguments.
      */
-    private void setProperty(String propertyName, Object propertyValue) {
+    void setProperty(String propertyName, Object propertyValue) {
         String methodName = this.toMethodName(propertyName);
         Method[] methods = Utils.findMethods(methodName, this.connectionFactory.getClass());
         if (methods != null && methods.length > 0) {
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html
index d1e1325..0e92a65 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html
@@ -1,56 +1,155 @@
 <!DOCTYPE html>
 <html lang="en">
-    <!--
-      Licensed to the Apache Software Foundation (ASF) under one or more
-      contributor license agreements.  See the NOTICE file distributed with
-      this work for additional information regarding copyright ownership.
-      The ASF licenses this file to You under the Apache License, Version 2.0
-      (the "License"); you may not use this file except in compliance with
-      the License.  You may obtain a copy of the License at
-          http://www.apache.org/licenses/LICENSE-2.0
-      Unless required by applicable law or agreed to in writing, software
-      distributed under the License is distributed on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-      See the License for the specific language governing permissions and
-      limitations under the License.
-    -->
-    <head>
-        <meta charset="utf-8" />
-        <title>JMSConnectionFactoryProvider</title>
-        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
-    </head>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>JMSConnectionFactoryProvider</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
 
-    <body>
-        <h2>Description:</h2>
-        <p>
-            This ControllerService serves as a general factory service to serving vendor specific 
-            instances of the <i>javax.jms.ConnectionFactory</i>. It does so by allowing user to 
-            configure vendor specific properties as well as point to the location of the vendor 
-            provided JMS client libraries so the correct implementation of the <i>javax.jms.ConnectionFactory</i>
-            can be found, loaded, instantiated and served to the dependent Processors (see PublishJMS, ConsumeJMS).
-        </p>
-        <p>
-            To accommodate variety of JMS vendors and their implementation of the <i>ConnectionFactory</i> 
-            this ControllerService exposes only 3 static configuration properties that are common across many implementations 
-            of the <i>ConnectionFactory</i>. The rest of the configuration properties are set following 
-            <a href="http://docstore.mik.ua/orelly/java-ent/jnut/ch06_02.htm">Java Beans</a> convention (see below).
-	    </p>
-	    <p>
-        The 3 static configuration properties are:
+<body>
+<h2>Description</h2>
+<p>
+    This controller service serves as a general factory service to serving vendor specific
+    instances of the <i>javax.jms.ConnectionFactory</i>. It does so by allowing user to
+    configure vendor specific properties as well as point to the location of the vendor
+    provided JMS client libraries so the correct implementation of the <i>javax.jms.ConnectionFactory</i>
+    can be found, loaded, instantiated and served to the dependent processors (see PublishJMS, ConsumeJMS).
+</p>
+<p>
+    All JMS vendors and <i>ConnectionFactory</i> implementations are supported as long as the configuration values can
+    be set through <i>set</i> methods (detailed explanation in the last paragraph). However some helpful accommodation
+    are done for the following JMS vendors:
+</p>
+<ul>
+    <li>Apache ActiveMQ</li>
+    <li>IBM MQ</li>
+    <li>TIBCO EMS</li>
+</ul>
+<p>
+    This controller service exposes only a single mandatory static configuration property that are required across all
+    implementations. The rest of the configuration properties are either optional or vendor specific.
+</p>
+<p>
+    The mandatory configuration property is:
+</p>
+<ul>
+    <li><b>MQ ConnectionFactory Implementation</b> - A fully qualified name of the JMS <i>ConnectionFactory</i>
+        implementation class. For example:
         <ul>
-  		  <li><b>MQ ConnectionFactory Implementation</b> - A fully qualified name of the JMS ConnectionFactory implementation 
-  		         class (i.e., org.apache.activemq.ActiveMQConnectionFactory)</li>
-          <li><b>MQ Client Libraries path</b> - Path to the directory with additional resources (i.e., JARs, configuration files etc.) to be added 
-							to the classpath. Such resources typically represent target MQ client libraries for the ConnectionFactory 
-							implementation. It is optional if you are using Apache ActiveMQ since its libraries are distributed with this component.</li>
-          <li><b>Broker URI</b> - URI pointing to the network location of the JMS Message broker. For example, 'tcp://myhost:61616' for ActiveMQ or simply 'myhost:1414'.</li>
+            <li>Apache ActiveMQ - <a href="http://activemq.apache.org/maven/5.15.9/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html" target="_blank">org.apache.activemq.ActiveMQConnectionFactory</a></li>
+            <li>IBM MQ - <a href="https://www-01.ibm.com/support/knowledgecenter/SSFKSJ_8.0.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQQueueConnectionFactory.html" target="_blank">com.ibm.mq.jms.MQQueueConnectionFactory</a></li>
+            <li>TIBCO EMS - <a href="https://docs.tibco.com/pub/enterprise_message_service/8.1.0/doc/html/tib_ems_api_reference/api/javadoc/com/tibco/tibjms/TibjmsQueueConnectionFactory.html" target="_blank">com.tibco.tibjms.TibjmsQueueConnectionFactory</a></li>
         </ul>
-        The rest of the properties are set as Dynamic Properties following <a href="http://docstore.mik.ua/orelly/java-ent/jnut/ch06_02.htm">Java Beans</a> 
-        convention where a property name is derived from the <i>set*</i> method of the vendor specific ConnectionFactory's implementation.
-        For example, <i>com.ibm.mq.jms.MQConnectionFactory.setChannel(String)</i> would imply 'channel' property and 
-        <i>com.ibm.mq.jms.MQConnectionFactory.setTransportType(int)</i> would imply 'transportType' property.
-        For the list of available properties please consult vendor provided documentation. Here is an example for 
-        <a href="https://www-01.ibm.com/support/knowledgecenter/SSFKSJ_8.0.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQQueueConnectionFactory.html">IBM's com.ibm.mq.jms.MQConnectionFactory</a>
-        </p>
-    </body>
+    </li>
+</ul>
+<p>
+    The following static configuration properties are optional but required in many cases:
+</p>
+<ul>
+    <li><b>MQ Client Libraries path</b> - Path to the directory with additional resources (i.e. JARs,
+        configuration files, etc.) to be added to the classpath. Such resources typically represent target client
+        libraries for the <i>ConnectionFactory</i> implementation. It is optional if you are using Apache ActiveMQ since
+        its libraries are included with this component.
+    </li>
+    <li><b>Broker URI</b> - URI pointing to the network location of the JMS Message broker. For example:
+        <ul>
+            <li>Apache ActiveMQ - <i>tcp://myhost:1234</i> for single broker and
+                <i>failover:(tcp://myhost01:1234,tcp://myhost02:1234)</i> for multiple brokers.
+            </li>
+            <li>IBM MQ - <i>myhost(1234)</i> for single broker. <i>myhost01(1234),myhost02(1234)</i> for multiple
+                brokers.
+            </li>
+            <li>TIBCO EMS - <i>tcp://myhost:1234</i> for single broker and
+                <i>tcp://myhost01:7222,tcp://myhost02:7222</i> for multiple brokers.
+            </li>
+        </ul>
+    </li>
+</ul>
+<p>
+    The rest of the vendor specific configuration are set through dynamic properties utilizing the
+    <a href="http://docstore.mik.ua/orelly/java-ent/jnut/ch06_02.htm" target="_blank">Java Beans</a> convention where a property
+    name is derived from the <i>set</i> method of the vendor specific <i>ConnectionFactory</i>'s implementation. For
+    example, <i>com.ibm.mq.jms.MQConnectionFactory.setChannel(String)</i> would imply 'channel' property and
+    <i>com.ibm.mq.jms.MQConnectionFactory.setTransportType(int)</i> would imply 'transportType' property. For the list
+    of available properties please consult vendor provided documentation. Following is examples of such vendor provided
+    documentation:
+</p>
+<ul>
+    <li><a href="http://activemq.apache.org/maven/5.15.9/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html" target="_blank">Apache
+        ActiveMQ</a></li>
+    <li>
+        <a href="https://www.ibm.com/support/knowledgecenter/SSFKSJ_8.0.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html" target="_blank">IBM
+            MQ</a></li>
+    <li>
+        <a href="https://docs.tibco.com/pub/enterprise_message_service/8.1.0/doc/html/tib_ems_api_reference/api/javadoc/com/tibco/tibjms/TibjmsConnectionFactory.html" target="_blank">TIBCO
+            EMS</a></li>
+</ul>
+
+<h2>Sample controller service configuration for IBM MQ</h2>
+<table>
+    <tr>
+        <th>Property</th>
+        <th>Value</th>
+        <th>Static/Dynamic</th>
+        <th>Comments</th>
+    </tr>
+    <tr>
+        <td>MQ ConnectionFactory Implementation</td>
+        <td>com.ibm.mq.jms.MQQueueConnectionFactory</td>
+        <td>Static</td>
+        <td>Vendor provided implementation of QueueConnectionFactory</td>
+    </tr>
+    <tr>
+        <td>MQ Client Libraries path (i.e. /usr/jms/lib)</td>
+        <td>/opt/mqm/java/lib</td>
+        <td>Static</td>
+        <td>Default installation path of client JAR files on Linux systems</td>
+    </tr>
+    <tr>
+        <td>Broker URI</td>
+        <td>mqhost01(1414),mqhost02(1414)</td>
+        <td>Static</td>
+        <td><a href="https://www.ibm.com/support/knowledgecenter/ro/SSAW57_9.0.0/com.ibm.websphere.nd.multiplatform.doc/ae/ucli_pqcfm.html#MQTopicConnectionFactory_enterporthostname" target="_blank">Connection Name List syntax</a>.
+            Colon separated host/port pair(s) is also supported</td>
+    </tr>
+    <tr>
+        <td>SSL Context Service</td>
+        <td></td>
+        <td>Static</td>
+        <td>Only required if using SSL/TLS</td>
+    </tr>
+    <tr>
+        <td>channel</td>
+        <td>TO.BAR</td>
+        <td>Dynamic</td>
+        <td>Required when using the client transport mode</td>
+    </tr>
+    <tr>
+        <td>queueManager</td>
+        <td>PQM1</td>
+        <td>Dynamic</td>
+        <td>Name of queue manager. Always required.</td>
+    </tr>
+    <tr>
+        <td>transportType</td>
+        <td>1</td>
+        <td>Dynamic</td>
+        <td>Constant integer value corresponding to the client transport mode. Default value is <a href="https://www.ibm.com/support/knowledgecenter/en/SSEQTP_9.0.0/com.ibm.websphere.base.doc/ae/umj_pjcfm.html" target="_blank">"Bindings, then client"</a></td>
+    </tr>
+</table>
+</body>
 </html>
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java
new file mode 100644
index 0000000..e2df94f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.cf;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Sub-class of {@link JMSConnectionFactoryProvider} only for testing purpose
+ */
+public class JMSConnectionFactoryProviderForTest extends JMSConnectionFactoryProvider {
+    private static Logger logger = LoggerFactory.getLogger(JMSConnectionFactoryProviderForTest.class);
+
+    private Map<String, Object> setProperties = new HashMap<>();
+
+    @OnEnabled
+    @Override
+    public void enable(ConfigurationContext context) {
+        setConnectionFactoryProperties(context);
+    }
+
+    @Override
+    void setProperty(String propertyName, Object propertyValue) {
+        setProperties.put(propertyName, propertyValue);
+    }
+
+    public Map<String, Object> getSetProperties() {
+        return setProperties;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java
index 25f4398..b50a3d4 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java
@@ -16,89 +16,470 @@
  */
 package org.apache.nifi.jms.cf;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URISyntaxException;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
 /**
- *
+ * Tests for {@link JMSConnectionFactoryProvider}
  */
 public class JMSConnectionFactoryProviderTest {
 
     private static Logger logger = LoggerFactory.getLogger(JMSConnectionFactoryProviderTest.class);
 
+    private static final String HOSTNAME = "myhost";
+    private static final String PORT = "1234";
+
+    private static final String SINGLE_TEST_BROKER = HOSTNAME + ":" + PORT;
+    private static final String SINGLE_TEST_BROKER_WITH_SCHEME = "tcp://myhost:1234";
+    private static final String SINGLE_TEST_BROKER_WITH_SCHEME_AND_IP = "tcp://0.0.0.0:616161";
+    private static final String MULTIPLE_TEST_BROKERS = "myhost01:1234,myhost02:1234";
+    private static final String SINGLE_ACTIVEMQ_BROKER = "tcp://myhost:61616";
+    private static final String MULTIPLE_ACTIVEMQ_BROKERS = "failover:(tcp://myhost01:61616,tcp://myhost02:61616)?randomize=false";
+    private static final String SINGLE_TIBCO_BROKER = "tcp://myhost:7222";
+    private static final String MULTIPLE_TIBCO_BROKERS = "tcp://myhost01:7222,tcp://myhost02:7222";
+    private static final String SINGLE_IBM_MQ_BROKER = "myhost(1414)";
+    private static final String MULTIPLE_IBM_MQ_BROKERS = "myhost01(1414),myhost02(1414)";
+    private static final String MULTIPLE_IBM_MQ_MIXED_BROKERS = "myhost01:1414,myhost02(1414)";
+    private static final String MULTIPLE_IBM_MQ_COLON_PAIR_BROKERS = "myhost01:1414,myhost02:1414";
+
+    private static final String TEST_CONNECTION_FACTORY_IMPL = "org.apache.nifi.jms.testcflib.TestConnectionFactory";
+    private static final String ACTIVEMQ_CONNECTION_FACTORY_IMPL = "org.apache.activemq.ActiveMQConnectionFactory";
+    private static final String TIBCO_CONNECTION_FACTORY_IMPL = "com.tibco.tibjms.TibjmsConnectionFactory";
+    private static final String IBM_MQ_CONNECTION_FACTORY_IMPL = "com.ibm.mq.jms.MQConnectionFactory";
+
+    private static final String controllerServiceId = "cfProvider";
+
+    private static final String DUMMY_JAR_1 = "dummy-lib.jar";
+    private static final String DUMMY_JAR_2 = "dummy-lib-2.jar";
+    private static final String DUMMY_CONF = "dummy.conf";
+
+    private String dummyResource;
+    private String allDummyResources;
+
+    @Before
+    public void prepareTest() throws URISyntaxException {
+        dummyResource = this.getClass().getResource("/" + DUMMY_JAR_1).toURI().toString();
+        allDummyResources = this.getClass().getResource("/" + DUMMY_JAR_1).toURI().toString() + "," +
+                this.getClass().getResource("/" + DUMMY_JAR_2).toURI().toString() + "," +
+                this.getClass().getResource("/" + DUMMY_CONF).toURI().toString() + ",";
+    }
+
     @Test
-    public void validateNotValidForNonExistingLibPath() throws Exception {
+    public void validateNotValidForNonExistingLibPath() throws InitializationException {
         TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
         JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
-        runner.addControllerService("cfProvider", cfProvider);
-        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234");
+        runner.addControllerService(controllerServiceId, cfProvider);
 
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER);
         runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "foo");
-        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
-                "org.apache.nifi.jms.testcflib.TestConnectionFactory");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
+
         runner.assertNotValid(cfProvider);
     }
 
     @Test
-    public void validateELExpression() throws InitializationException, URISyntaxException {
+    public void validateELExpression() throws InitializationException {
         TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
         runner.setValidateExpressionUsage(true);
+
         JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
-        String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString();
-        runner.addControllerService("cfProvider", cfProvider);
+        runner.addControllerService(controllerServiceId, cfProvider);
 
-        runner.setVariable("broker.uri", "tcp://0.0.0.0:616161");
-        runner.setVariable("client.lib", clientLib);
+        runner.setVariable("broker.uri", SINGLE_TEST_BROKER_WITH_SCHEME_AND_IP);
+        runner.setVariable("client.lib", dummyResource);
 
         runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "${broker.uri}");
         runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "${client.lib}");
-        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
-                "org.apache.nifi.jms.testcflib.TestConnectionFactory");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
+
         runner.assertValid(cfProvider);
     }
 
     @Test
-    public void testClientLibResourcesLoaded() throws InitializationException, URISyntaxException {
+    public void testClientLibResourcesLoaded() throws InitializationException {
         TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
         runner.setValidateExpressionUsage(true);
 
         JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
 
-        String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString() + "," +
-                           this.getClass().getResource("/dummy-lib-2.jar").toURI().toString() + "," +
-                           this.getClass().getResource("/dummy.conf").toURI().toString() + ",";
-
-        runner.addControllerService("cfProvider", cfProvider);
-
-        runner.setVariable("broker.uri", "tcp://0.0.0.0:616161");
-        runner.setVariable("client.lib", clientLib);
+        runner.setVariable("broker.uri", SINGLE_TEST_BROKER_WITH_SCHEME_AND_IP);
+        runner.setVariable("client.lib", allDummyResources);
 
         runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "${broker.uri}");
         runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "${client.lib}");
-        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
-                "org.apache.nifi.jms.testcflib.TestConnectionFactory");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
 
         runner.assertValid(cfProvider);
 
         ClassLoader loader = runner.getClass().getClassLoader();
-        Assert.assertTrue(loader.getResource("dummy.conf") != null);
-        Assert.assertTrue(loader.getResource("dummy-lib.jar") != null);
-        Assert.assertTrue(loader.getResource("dummy-lib-2.jar") != null);
+        Assert.assertNotNull(loader.getResource(DUMMY_CONF));
+        Assert.assertNotNull(loader.getResource(DUMMY_JAR_1));
+        Assert.assertNotNull(loader.getResource(DUMMY_JAR_2));
     }
 
     @Test(expected = IllegalStateException.class)
-    public void validateGetConnectionFactoryFailureIfServiceNotConfigured() throws Exception {
+    public void validateGetConnectionFactoryFailureIfServiceNotConfigured() {
         new JMSConnectionFactoryProvider().getConnectionFactory();
     }
 
+    @Test
+    public void validWithSingleTestBroker() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithSingleTestBrokerWithScheme() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER_WITH_SCHEME);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithMultipleTestBrokers() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_TEST_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithSingleActiveMqBroker() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_ACTIVEMQ_BROKER);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithMultipleActiveMqBrokers() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_ACTIVEMQ_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithSingleTibcoBroker() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TIBCO_BROKER);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithMultipleTibcoBrokers() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_TIBCO_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithSingleIbmMqBroker() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_IBM_MQ_BROKER);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithMultipleIbmMqBrokers() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithMultipleIbmMqMixedBrokers() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_MIXED_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithMultipleIbmMqColorPairBrokers() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_COLON_PAIR_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void propertiesSetOnSingleTestBrokerConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("hostName", HOSTNAME, "port", PORT));
+    }
+
+    @Test
+    public void propertiesSetOnSingleTestBrokerWithSchemaConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER_WITH_SCHEME);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of());
+    }
+
+    @Test
+    public void propertiesSetOnMultipleTestBrokersConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_TEST_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("hostName", "myhost01", "port", "1234"));
+    }
+
+    @Test
+    public void propertiesSetOnSingleActiveMqBrokerConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_ACTIVEMQ_BROKER);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("brokerURL", SINGLE_ACTIVEMQ_BROKER));
+    }
+
+    @Test
+    public void propertiesSetOnMultipleActiveMqBrokersConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_ACTIVEMQ_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("brokerURL", MULTIPLE_ACTIVEMQ_BROKERS));
+    }
+
+    @Test
+    public void propertiesSetOnSingleTibcoBrokerConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TIBCO_BROKER);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("serverUrl", SINGLE_TIBCO_BROKER));
+    }
+
+    @Test
+    public void propertiesSetOnMultipleTibcoBrokersConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_TIBCO_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("serverUrl", MULTIPLE_TIBCO_BROKERS));
+    }
+
+    @Test
+    public void propertiesSetOnSingleIbmMqBrokerConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_IBM_MQ_BROKER);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("connectionNameList", SINGLE_IBM_MQ_BROKER));
+    }
+
+    @Test
+    public void propertiesSetOnMultipleIbmMqBrokersConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
+    }
+
+    @Test
+    public void propertiesSetOnMultipleIbmMqMixedBrokersConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_MIXED_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
+    }
+
+    @Test
+    public void propertiesSetOnMultipleIbmMqColonPairBrokersConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_COLON_PAIR_BROKERS);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
+    }
+
+    @Test
+    public void propertiesSetOnSingleIbmMqColonSeparatedPairBrokerConnectionFactory() throws InitializationException {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+        runner.addControllerService(controllerServiceId, cfProvider);
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
+
+        runner.enableControllerService(cfProvider);
+
+        assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("connectionNameList", HOSTNAME+"("+PORT+")"));
+    }
 }