You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mo...@apache.org on 2019/02/21 20:23:27 UTC

[nifi] branch master updated: NIFI-5660: JMSPublisher should set some header properties in JmsTemplate instead of directly in the message NIFI-5660: Added lines to integration test to verify these header properties (added by Mike Moser)

This is an automated email from the ASF dual-hosted git repository.

mosermw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new a0c28ee  NIFI-5660: JMSPublisher should set some header properties in JmsTemplate instead of directly in the message NIFI-5660: Added lines to integration test to verify these header properties (added by Mike Moser)
a0c28ee is described below

commit a0c28ee19e1b29b30e17946f074a01f94cfbc3c0
Author: Mark Bean <ma...@gmail.com>
AuthorDate: Fri Oct 5 14:08:28 2018 +0000

    NIFI-5660: JMSPublisher should set some header properties in JmsTemplate instead of directly in the message
    NIFI-5660: Added lines to integration test to verify these header properties (added by Mike Moser)
    
    Signed-off-by: Mike Moser <mo...@apache.org>
    
    This closes #3053
---
 .../org/apache/nifi/jms/processors/AbstractJMSProcessor.java     | 5 +++++
 .../main/java/org/apache/nifi/jms/processors/JMSPublisher.java   | 9 ++++++---
 .../org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java   | 4 ++++
 3 files changed, 15 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index f47cf78..e51238d 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -33,6 +33,7 @@ import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapte
 import org.springframework.jms.core.JmsTemplate;
 
 import javax.jms.ConnectionFactory;
+import javax.jms.Message;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
@@ -177,6 +178,10 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
                 }
             }
             if (worker != null) {
+                worker.jmsTemplate.setExplicitQosEnabled(false);
+                worker.jmsTemplate.setDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
+                worker.jmsTemplate.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
+                worker.jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);
                 workerPool.offer(worker);
             }
         }
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index c13f4b7..506de49 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -84,11 +84,14 @@ final class JMSPublisher extends JMSWorker {
             for (Entry<String, String> entry : flowFileAttributesToSend.entrySet()) {
                 try {
                     if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
-                        message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));
+                        this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue()));
+                        this.jmsTemplate.setExplicitQosEnabled(true);
                     } else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) {
-                        message.setJMSExpiration(Integer.parseInt(entry.getValue()));
+                        this.jmsTemplate.setTimeToLive(Integer.parseInt(entry.getValue()));
+                        this.jmsTemplate.setExplicitQosEnabled(true);
                     } else if (entry.getKey().equals(JmsHeaders.PRIORITY)) {
-                        message.setJMSPriority(Integer.parseInt(entry.getValue()));
+                        this.jmsTemplate.setPriority(Integer.parseInt(entry.getValue()));
+                        this.jmsTemplate.setExplicitQosEnabled(true);
                     } else if (entry.getKey().equals(JmsHeaders.REDELIVERED)) {
                         message.setJMSRedelivered(Boolean.parseBoolean(entry.getValue()));
                     } else if (entry.getKey().equals(JmsHeaders.TIMESTAMP)) {
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index 7812e71..12474ec 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -77,6 +77,8 @@ public class JMSPublisherConsumerIT {
             flowFileAttributes.put("illegal-property", "value");
             flowFileAttributes.put("another.illegal", "value");
             flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
+            flowFileAttributes.put(JmsHeaders.DELIVERY_MODE, "1");
+            flowFileAttributes.put(JmsHeaders.PRIORITY, "1");
             flowFileAttributes.put(JmsHeaders.EXPIRATION, "never"); // value expected to be integer, make sure non-integer doesn't cause problems
             publisher.publish(destinationName, "hellomq".getBytes(), flowFileAttributes);
 
@@ -86,6 +88,8 @@ public class JMSPublisherConsumerIT {
             assertFalse(receivedMessage.propertyExists("illegal-property"));
             assertFalse(receivedMessage.propertyExists("another.illegal"));
             assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
+            assertEquals(1, receivedMessage.getJMSDeliveryMode());
+            assertEquals(1, receivedMessage.getJMSPriority());
             assertEquals("myTopic", ((Topic) receivedMessage.getJMSReplyTo()).getTopicName());
 
         } finally {