You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/07/01 10:01:20 UTC

[GitHub] [nifi] gardellajuanpablo commented on a change in pull request #4352: NIFI-7563 Optimize the usage of JMS sessions and message producers

gardellajuanpablo commented on a change in pull request #4352:
URL: https://github.com/apache/nifi/pull/4352#discussion_r448255708



##########
File path: nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
##########
@@ -385,6 +390,66 @@ protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFac
         }
     }
 
+    /**
+     * <p>
+     * This test validates the optimal resources usage. To process one message is expected to create only one connection, one session and one message producer.
+     * </p>
+     * <p>
+     * See <a href="NIFI-7563">https://issues.apache.org/jira/browse/NIFI-7563</a> for details.
+     * </p>
+     * @throws Exception any error related to the broker.
+     */
+    @Test(timeout = 10000)
+    public void validateNIFI7563() throws Exception {
+        BrokerService broker = new BrokerService();
+        try {
+            broker.setPersistent(false);
+            TransportConnector connector = broker.addConnector("tcp://127.0.0.1:0");
+            int port = connector.getServer().getSocketAddress().getPort();
+            broker.start();
+
+            final ActiveMQConnectionFactory innerCf = new ActiveMQConnectionFactory("tcp://127.0.0.1:" + port);
+            ConnectionFactoryInvocationHandler connectionFactoryProxy = new ConnectionFactoryInvocationHandler(innerCf);
+
+            // Create a connection Factory proxy to catch metrics and usage.
+            ConnectionFactory cf = (ConnectionFactory) Proxy.newProxyInstance(ConnectionFactory.class.getClassLoader(), new Class[] { ConnectionFactory.class }, connectionFactoryProxy);
+
+            TestRunner runner = TestRunners.newTestRunner(new PublishJMS());
+            JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
+            when(cs.getIdentifier()).thenReturn("cfProvider");
+            when(cs.getConnectionFactory()).thenReturn(cf);
+            runner.addControllerService("cfProvider", cs);
+            runner.enableControllerService(cs);
+
+            runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+
+            String destinationName = "myDestinationName";
+            // The destination option according current implementation should contain topic or queue to infer the destination type
+            // from the name. Check https://issues.apache.org/jira/browse/NIFI-7561. Once that is fixed, the name can be
+            // randomly created.
+            String topicNameInHeader = "topic-foo";
+            runner.setProperty(PublishJMS.DESTINATION, destinationName);
+            runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.QUEUE);
+
+            Map<String, String> flowFileAttributes = new HashMap<>();
+            // This method will be removed once https://issues.apache.org/jira/browse/NIFI-7564 is fixed.
+            flowFileAttributes.put(JmsHeaders.DESTINATION, topicNameInHeader);
+            flowFileAttributes.put(JmsHeaders.REPLY_TO, topicNameInHeader);
+            runner.enqueue("hi".getBytes(), flowFileAttributes);
+            runner.enqueue("h2".getBytes(), flowFileAttributes);
+            runner.setThreadCount(1);

Review comment:
       Done, but I messed up during rebase. I have to create another branch from latest changes in master. The changes are now at https://github.com/apache/nifi/pull/4378. Sorry about that, could you please review it soon? That will prevent to me rebase it again :). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org