You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/01/07 11:54:57 UTC

[GitHub] vkcelik closed pull request #3246: NIFI-5929 Support for IBM MQ multi-instance queue managers

vkcelik closed pull request #3246: NIFI-5929 Support for IBM MQ multi-instance queue managers
URL: https://github.com/apache/nifi/pull/3246
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
index ecb4e7a538..0528b77ab3 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
@@ -51,10 +51,10 @@
  * <p>
  * It accomplishes it by adjusting current classpath by adding to it the
  * additional resources (i.e., JMS client libraries) provided by the user via
- * {@link JMSConnectionFactoryProviderDefinition#CLIENT_LIB_DIR_PATH}, allowing
+ * {@link JMSConnectionFactoryProvider#CLIENT_LIB_DIR_PATH}, allowing
  * it then to create an instance of the target {@link ConnectionFactory} based
  * on the provided
- * {@link JMSConnectionFactoryProviderDefinition#CONNECTION_FACTORY_IMPL} which
+ * {@link JMSConnectionFactoryProvider#CONNECTION_FACTORY_IMPL} which
  * can be than access via {@link #getConnectionFactory()} method.
  * </p>
  */
@@ -105,8 +105,8 @@
     public static final PropertyDescriptor BROKER_URI = new PropertyDescriptor.Builder()
             .name(BROKER)
             .displayName("Broker URI")
-            .description("URI pointing to the network location of the JMS Message broker. For example, "
-                    + "'tcp://myhost:61616' for ActiveMQ or 'myhost:1414' for IBM MQ")
+            .description("URI pointing to the network location of the JMS Message broker. Example for ActiveMQ: "
+                    + "'tcp://myhost:61616'. Examples for IBM MQ: 'myhost:1414' and 'myhost01(1414),myhost02(1414)'")
             .addValidator(new NonEmptyBrokerURIValidator())
             .required(true)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -185,23 +185,31 @@ public void disable() {
      * service configuration. For example, 'channel' property will correspond to
      * 'setChannel(..) method and 'queueManager' property will correspond to
      * setQueueManager(..) method with a single argument.
-     * <p>
+     * <br>
      * There are also few adjustments to accommodate well known brokers. For
      * example ActiveMQ ConnectionFactory accepts address of the Message Broker
      * in a form of URL while IBMs in the form of host/port pair (more common).
      * So this method will use value retrieved from the 'BROKER_URI' static
      * property 'as is' if ConnectionFactory implementation is coming from
-     * ActiveMQ and for all others (for now) the 'BROKER_URI' value will be
+     * ActiveMQ or Tibco. For all others (for now) the 'BROKER_URI' value will be
      * split on ':' and the resulting pair will be used to execute
      * setHostName(..) and setPort(..) methods on the provided
-     * ConnectionFactory. This may need to be maintained and adjusted to
-     * accommodate other implementation of ConnectionFactory, but only for
-     * URL/Host/Port issue. All other properties are set as dynamic properties
-     * where user essentially provides both property name and value, The bean
-     * convention is also explained in user manual for this component with links
-     * pointing to documentation of various ConnectionFactories.
+     * ConnectionFactory. An exception to this if the ConnectionFactory
+     * implementation is coming from IBM MQ and multiple brokers are listed,
+     * in this case setConnectionNameList(..) method is executed.
+     * This may need to be maintained and adjusted to accommodate other
+     * implementation of ConnectionFactory, but only for URL/Host/Port issue.
+     * All other properties are set as dynamic properties where user essentially
+     * provides both property name and value, The bean convention is also
+     * explained in user manual for this component with links pointing to
+     * documentation of various ConnectionFactories.
      *
-     * @see #setProperty(String, String) method
+     * @see <a href="http://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html#setBrokerURL-java.lang.String-">setBrokerURL(String brokerURL)</a>
+     * @see <a href="https://docs.tibco.com/pub/enterprise_message_service/8.1.0/doc/html/tib_ems_api_reference/api/javadoc/com/tibco/tibjms/TibjmsConnectionFactory.html#setServerUrl(java.lang.String)">setServerUrl(String serverUrl)</a>
+     * @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setHostName_java.lang.String_">setHostName(String hostname)</a>
+     * @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setPort_int_">setPort(int port)</a>
+     * @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setConnectionNameList_java.lang.String_">setConnectionNameList(String hosts)</a>
+     * @see #setProperty(String propertyName, Object propertyValue)
      */
     private void setConnectionFactoryProperties(ConfigurationContext context) {
         for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
@@ -210,17 +218,23 @@ private void setConnectionFactoryProperties(ConfigurationContext context) {
             if (descriptor.isDynamic()) {
                 this.setProperty(propertyName, entry.getValue());
             } else {
-                if (propertyName.equals(BROKER)) {
-                    String brokerValue = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
-                    if (context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue().startsWith("org.apache.activemq")) {
+                if (descriptor == BROKER_URI) {
+                    String brokerValue = context.getProperty(BROKER_URI).evaluateAttributeExpressions().getValue();
+                    String connectionFactoryValue = context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
+                    if (connectionFactoryValue.startsWith("org.apache.activemq")) {
                         this.setProperty("brokerURL", brokerValue);
+                    } else if (connectionFactoryValue.startsWith("com.tibco.tibjms")) {
+                        this.setProperty("serverUrl", brokerValue);
                     } else {
+                        // Try to parse broker URI as colon separated host/port pair
                         String[] hostPort = brokerValue.split(":");
+                        // If broker URI indeed was colon separated host/port pair
                         if (hostPort.length == 2) {
                             this.setProperty("hostName", hostPort[0]);
                             this.setProperty("port", hostPort[1]);
-                        } else if (hostPort.length != 2) {
-                            this.setProperty("serverUrl", brokerValue); // for tibco
+                        } else if (connectionFactoryValue.startsWith("com.ibm.mq.jms")) {
+                            // Assuming IBM MQ style broker was specified, e.g. "myhost(1414)" and "myhost01(1414),myhost02(1414)"
+                            this.setProperty("connectionNameList", brokerValue);
                         } else {
                             throw new IllegalArgumentException("Failed to parse broker url: " + brokerValue);
                         }
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java
index 25f4398f75..fd5fb203d6 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java
@@ -101,4 +101,157 @@ public void validateGetConnectionFactoryFailureIfServiceNotConfigured() throws E
         new JMSConnectionFactoryProvider().getConnectionFactory();
     }
 
+    @Test
+    public void validWithSingleTestBroker() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService("cfProvider", cfProvider);
+
+        String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString();
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, clientLib);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
+                "org.apache.nifi.jms.testcflib.TestConnectionFactory");
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithSingleTestBrokerWithScheme() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService("cfProvider", cfProvider);
+
+        String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString();
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "tcp://myhost:1234");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, clientLib);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
+                "org.apache.nifi.jms.testcflib.TestConnectionFactory");
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithMultipleTestBrokers() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService("cfProvider", cfProvider);
+
+        String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString();
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost01:1234,myhost02:1234");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, clientLib);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
+                "org.apache.nifi.jms.testcflib.TestConnectionFactory");
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithSingleActiveMqBroker() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService("cfProvider", cfProvider);
+
+        String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString();
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "tcp://myhost:61616");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, clientLib);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
+                "org.apache.activemq.ActiveMQConnectionFactory");
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithMultipleActiveMqBrokers() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService("cfProvider", cfProvider);
+
+        String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString();
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI,
+                "failover:(tcp://myhost01:61616,tcp://myhost02:61616)?randomize=false");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, clientLib);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
+                "org.apache.activemq.ActiveMQConnectionFactory");
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithSingleTibcoBroker() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService("cfProvider", cfProvider);
+
+        String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString();
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "tcp://myhost:7222");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, clientLib);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
+                "com.tibco.tibjms.TibjmsConnectionFactory");
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithMultipleTibcoBrokers() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService("cfProvider", cfProvider);
+
+        String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString();
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "tcp://myhost01:7222,tcp://myhost02:7222");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, clientLib);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
+                "com.tibco.tibjms.TibjmsConnectionFactory");
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithSingleIbmMqBroker() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService("cfProvider", cfProvider);
+
+        String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString();
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost(1414)");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, clientLib);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
+                "com.ibm.mq.jms.MQConnectionFactory");
+
+        runner.assertValid(cfProvider);
+    }
+
+    @Test
+    public void validWithMultipleIbmMqBrokers() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+        JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+        runner.addControllerService("cfProvider", cfProvider);
+
+        String clientLib = this.getClass().getResource("/dummy-lib.jar").toURI().toString();
+
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost01(1414),myhost02(1414)");
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, clientLib);
+        runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
+                "com.ibm.mq.jms.MQConnectionFactory");
+
+        runner.assertValid(cfProvider);
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services