You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by de...@apache.org on 2018/10/02 13:43:26 UTC

[1/3] nifi git commit: NIFI-3672 Add support for strongly typed message properties in PublishJMS

Repository: nifi
Updated Branches:
  refs/heads/master 813cc1f6a -> 895323f3c


NIFI-3672 Add support for strongly typed message properties in PublishJMS


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/66eeb488
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/66eeb488
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/66eeb488

Branch: refs/heads/master
Commit: 66eeb48802317cdff69fe83070d26dac7245294a
Parents: 5106dc0
Author: Mike Moser <mo...@apache.org>
Authored: Mon Aug 13 17:40:54 2018 +0000
Committer: Mike Moser <mo...@apache.org>
Committed: Tue Aug 14 16:37:35 2018 +0000

----------------------------------------------------------------------
 .../nifi/jms/processors/JMSPublisher.java       | 70 +++++++++++++++++--
 .../nifi/jms/processors/PublishJMSIT.java       | 73 ++++++++++++++++++++
 2 files changed, 139 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/66eeb488/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 9912c81..392157f 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -16,8 +16,11 @@
  */
 package org.apache.nifi.jms.processors;
 
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
@@ -78,11 +81,14 @@ final class JMSPublisher extends JMSWorker {
 
     void setMessageHeaderAndProperties(final Message message, final Map<String, String> flowFileAttributes) throws JMSException {
         if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
-            for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
+
+            Map<String, String> flowFileAttributesToSend = flowFileAttributes.entrySet().stream()
+                    .filter(entry -> !entry.getKey().contains("-") && !entry.getKey().contains(".")) // '-' and '.' are illegal chars in JMS property names
+                    .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+
+            for (Entry<String, String> entry : flowFileAttributesToSend.entrySet()) {
                 try {
-                    if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && !entry.getKey().contains("-") && !entry.getKey().contains(".")) {// '-' and '.' are illegal char in JMS prop names
-                        message.setStringProperty(entry.getKey(), entry.getValue());
-                    } else if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
+                    if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
                         message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));
                     } else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) {
                         message.setJMSExpiration(Integer.parseInt(entry.getValue()));
@@ -110,6 +116,11 @@ final class JMSPublisher extends JMSWorker {
                         } else {
                             logUnbuildableDestination(entry.getKey(), JmsHeaders.DESTINATION);
                         }
+                    } else {
+                        // not a special attribute handled above, so send it as a property using the specified property type
+                        String type = flowFileAttributes.getOrDefault(entry.getKey().concat(".type"), "unknown").toLowerCase();
+                        propertySetterMap.getOrDefault(type, JmsPropertySetterEnum.STRING)
+                                .setProperty(message, entry.getKey(), entry.getValue());
                     }
                 } catch (NumberFormatException ne) {
                     this.processLog.warn("Incompatible value for attribute " + entry.getKey()
@@ -146,4 +157,55 @@ final class JMSPublisher extends JMSWorker {
 
         return destination;
     }
+
+    /**
+     * Implementations of this interface use {@link javax.jms.Message} methods to set strongly typed properties.
+     */
+    public interface JmsPropertySetter {
+        void setProperty(final Message message, final String name, final String value) throws JMSException, NumberFormatException;
+    }
+
+    public enum JmsPropertySetterEnum implements JmsPropertySetter {
+        BOOLEAN( (message, name, value) -> {
+            message.setBooleanProperty(name, Boolean.parseBoolean(value));
+        } ),
+        BYTE( (message, name, value) -> {
+            message.setByteProperty(name, Byte.parseByte(value));
+        } ),
+        SHORT( (message, name, value) -> {
+            message.setShortProperty(name, Short.parseShort(value));
+        } ),
+        INTEGER( (message, name, value) -> {
+            message.setIntProperty(name, Integer.parseInt(value));
+        } ),
+        LONG( (message, name, value) -> {
+            message.setLongProperty(name, Long.parseLong(value));
+        } ),
+        FLOAT( (message, name, value) -> {
+            message.setFloatProperty(name, Float.parseFloat(value));
+        } ),
+        DOUBLE( (message, name, value) -> {
+            message.setDoubleProperty(name, Double.parseDouble(value));
+        } ),
+        STRING( (message, name, value) -> {
+            message.setStringProperty(name, value);
+        } );
+
+        private final JmsPropertySetter setter;
+        JmsPropertySetterEnum(JmsPropertySetter setter) {
+            this.setter = setter;
+        }
+
+        public void setProperty(Message message, String name, String value) throws JMSException, NumberFormatException {
+            setter.setProperty(message, name, value);
+        }
+    }
+
+    /**
+     * This map helps us avoid using JmsPropertySetterEnum.valueOf and dealing with IllegalArgumentException on failed lookup.
+     */
+    public static Map<String, JmsPropertySetterEnum> propertySetterMap = new HashMap<>();
+    static {
+        Arrays.stream(JmsPropertySetterEnum.values()).forEach(e -> propertySetterMap.put(e.name().toLowerCase(), e));
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/66eeb488/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index a365ad5..fa0bd7a 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -180,4 +181,76 @@ public class PublishJMSIT {
 
         runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
     }
+
+    @Test(timeout = 10000)
+    public void validatePublishPropertyTypes() throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+        final String destinationName = "validatePublishPropertyTypes";
+        PublishJMS pubProc = new PublishJMS();
+        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(cf);
+
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(PublishJMS.DESTINATION, destinationName);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("foo", "foo");
+        attributes.put("myboolean", "true");
+        attributes.put("myboolean.type", "boolean");
+        attributes.put("mybyte", "127");
+        attributes.put("mybyte.type", "byte");
+        attributes.put("myshort", "16384");
+        attributes.put("myshort.type", "short");
+        attributes.put("myinteger", "1544000");
+        attributes.put("myinteger.type", "INTEGER"); // test upper case
+        attributes.put("mylong", "9876543210");
+        attributes.put("mylong.type", "long");
+        attributes.put("myfloat", "3.14");
+        attributes.put("myfloat.type", "float");
+        attributes.put("mydouble", "3.14159265359");
+        attributes.put("mydouble.type", "double");
+        attributes.put("badtype", "3.14");
+        attributes.put("badtype.type", "pi"); // pi not recognized as a type, so send as String
+        attributes.put("badint", "3.14"); // value is not an integer
+        attributes.put("badint.type", "integer");
+
+        runner.enqueue("Hey dude!".getBytes(), attributes);
+        runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
+
+        final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+
+        JmsTemplate jmst = new JmsTemplate(cf);
+        BytesMessage message = (BytesMessage) jmst.receive(destinationName);
+
+        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
+        assertEquals("Hey dude!", new String(messageBytes));
+        assertEquals(true, message.getObjectProperty("foo") instanceof String);
+        assertEquals("foo", message.getStringProperty("foo"));
+        assertEquals(true, message.getObjectProperty("myboolean") instanceof Boolean);
+        assertEquals(true, message.getBooleanProperty("myboolean"));
+        assertEquals(true, message.getObjectProperty("mybyte") instanceof Byte);
+        assertEquals(127, message.getByteProperty("mybyte"));
+        assertEquals(true, message.getObjectProperty("myshort") instanceof Short);
+        assertEquals(16384, message.getShortProperty("myshort"));
+        assertEquals(true, message.getObjectProperty("myinteger") instanceof Integer);
+        assertEquals(1544000, message.getIntProperty("myinteger"));
+        assertEquals(true, message.getObjectProperty("mylong") instanceof Long);
+        assertEquals(9876543210L, message.getLongProperty("mylong"));
+        assertEquals(true, message.getObjectProperty("myfloat") instanceof Float);
+        assertEquals(3.14F, message.getFloatProperty("myfloat"), 0.001F);
+        assertEquals(true, message.getObjectProperty("mydouble") instanceof Double);
+        assertEquals(3.14159265359D, message.getDoubleProperty("mydouble"), 0.00000000001D);
+        assertEquals(true, message.getObjectProperty("badtype") instanceof String);
+        assertEquals("3.14", message.getStringProperty("badtype"));
+        assertFalse(message.propertyExists("badint"));
+
+        runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
+    }
 }


[3/3] nifi git commit: Merge branch 'pr2949'

Posted by de...@apache.org.
Merge branch 'pr2949'


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/895323f3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/895323f3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/895323f3

Branch: refs/heads/master
Commit: 895323f3c2ab8541a663ba35ed98209c256f7c12
Parents: 813cc1f 4f538f1
Author: Brandon Devries <de...@apache.org>
Authored: Tue Oct 2 09:40:36 2018 -0400
Committer: Brandon Devries <de...@apache.org>
Committed: Tue Oct 2 09:40:36 2018 -0400

----------------------------------------------------------------------
 .../nifi/jms/processors/JMSPublisher.java       | 70 +++++++++++++++++--
 .../apache/nifi/jms/processors/PublishJMS.java  |  6 +-
 .../nifi/jms/processors/PublishJMSIT.java       | 73 ++++++++++++++++++++
 3 files changed, 144 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[2/3] nifi git commit: NIFI-3672 updated PublishJMS message property docs

Posted by de...@apache.org.
NIFI-3672 updated PublishJMS message property docs

This closes #2949

Signed-off-by: Brandon Devries <de...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4f538f1e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4f538f1e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4f538f1e

Branch: refs/heads/master
Commit: 4f538f1ecb8e73859e8875bff10ab18971897180
Parents: 66eeb48
Author: Mike Moser <mo...@apache.org>
Authored: Tue Aug 14 18:55:10 2018 +0000
Committer: Brandon Devries <de...@apache.org>
Committed: Tue Oct 2 09:39:17 2018 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/jms/processors/PublishJMS.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4f538f1e/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index f58b9cf..3afa0f0 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -72,7 +72,11 @@ import org.springframework.jms.support.JmsHeaders;
         @ReadsAttribute(attribute = JmsHeaders.TYPE, description = "This attribute becomes the JMSType message header. Must be an integer."),
         @ReadsAttribute(attribute = JmsHeaders.REPLY_TO, description = "This attribute becomes the JMSReplyTo message header. Must be an integer."),
         @ReadsAttribute(attribute = JmsHeaders.DESTINATION, description = "This attribute becomes the JMSDestination message header. Must be an integer."),
-        @ReadsAttribute(attribute = "other attributes", description = "All other attributes that do not start with " + JmsHeaders.PREFIX + " are added as message properties.")
+        @ReadsAttribute(attribute = "other attributes", description = "All other attributes that do not start with " + JmsHeaders.PREFIX + " are added as message properties."),
+        @ReadsAttribute(attribute = "other attributes .type", description = "When an attribute will be added as a message property, a second attribute of the same name but with an extra"
+        + " `.type` at the end will cause the message property to be sent using that strong type. For example, attribute `delay` with value `12000` and another attribute"
+        + " `delay.type` with value `integer` will cause a JMS message property `delay` to be sent as an Integer rather than a String. Supported types are boolean, byte,"
+        + " short, integer, long, float, double, and string (which is the default).")
 })
 @SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class })
 @SystemResourceConsideration(resource = SystemResource.MEMORY)