You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by de...@apache.org on 2018/10/02 13:43:26 UTC
[1/3] nifi git commit: NIFI-3672 Add support for strongly typed
message properties in PublishJMS
Repository: nifi
Updated Branches:
refs/heads/master 813cc1f6a -> 895323f3c
NIFI-3672 Add support for strongly typed message properties in PublishJMS
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/66eeb488
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/66eeb488
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/66eeb488
Branch: refs/heads/master
Commit: 66eeb48802317cdff69fe83070d26dac7245294a
Parents: 5106dc0
Author: Mike Moser <mo...@apache.org>
Authored: Mon Aug 13 17:40:54 2018 +0000
Committer: Mike Moser <mo...@apache.org>
Committed: Tue Aug 14 16:37:35 2018 +0000
----------------------------------------------------------------------
.../nifi/jms/processors/JMSPublisher.java | 70 +++++++++++++++++--
.../nifi/jms/processors/PublishJMSIT.java | 73 ++++++++++++++++++++
2 files changed, 139 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/66eeb488/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 9912c81..392157f 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -16,8 +16,11 @@
*/
package org.apache.nifi.jms.processors;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.stream.Collectors;
import javax.jms.BytesMessage;
import javax.jms.Destination;
@@ -78,11 +81,14 @@ final class JMSPublisher extends JMSWorker {
void setMessageHeaderAndProperties(final Message message, final Map<String, String> flowFileAttributes) throws JMSException {
if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
- for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
+
+ Map<String, String> flowFileAttributesToSend = flowFileAttributes.entrySet().stream()
+ .filter(entry -> !entry.getKey().contains("-") && !entry.getKey().contains(".")) // '-' and '.' are illegal chars in JMS property names
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+
+ for (Entry<String, String> entry : flowFileAttributesToSend.entrySet()) {
try {
- if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && !entry.getKey().contains("-") && !entry.getKey().contains(".")) {// '-' and '.' are illegal char in JMS prop names
- message.setStringProperty(entry.getKey(), entry.getValue());
- } else if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
+ if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));
} else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) {
message.setJMSExpiration(Integer.parseInt(entry.getValue()));
@@ -110,6 +116,11 @@ final class JMSPublisher extends JMSWorker {
} else {
logUnbuildableDestination(entry.getKey(), JmsHeaders.DESTINATION);
}
+ } else {
+ // not a special attribute handled above, so send it as a property using the specified property type
+ String type = flowFileAttributes.getOrDefault(entry.getKey().concat(".type"), "unknown").toLowerCase();
+ propertySetterMap.getOrDefault(type, JmsPropertySetterEnum.STRING)
+ .setProperty(message, entry.getKey(), entry.getValue());
}
} catch (NumberFormatException ne) {
this.processLog.warn("Incompatible value for attribute " + entry.getKey()
@@ -146,4 +157,55 @@ final class JMSPublisher extends JMSWorker {
return destination;
}
+
+ /**
+ * Implementations of this interface use {@link javax.jms.Message} methods to set strongly typed properties.
+ */
+ public interface JmsPropertySetter {
+ void setProperty(final Message message, final String name, final String value) throws JMSException, NumberFormatException;
+ }
+
+ public enum JmsPropertySetterEnum implements JmsPropertySetter {
+ BOOLEAN( (message, name, value) -> {
+ message.setBooleanProperty(name, Boolean.parseBoolean(value));
+ } ),
+ BYTE( (message, name, value) -> {
+ message.setByteProperty(name, Byte.parseByte(value));
+ } ),
+ SHORT( (message, name, value) -> {
+ message.setShortProperty(name, Short.parseShort(value));
+ } ),
+ INTEGER( (message, name, value) -> {
+ message.setIntProperty(name, Integer.parseInt(value));
+ } ),
+ LONG( (message, name, value) -> {
+ message.setLongProperty(name, Long.parseLong(value));
+ } ),
+ FLOAT( (message, name, value) -> {
+ message.setFloatProperty(name, Float.parseFloat(value));
+ } ),
+ DOUBLE( (message, name, value) -> {
+ message.setDoubleProperty(name, Double.parseDouble(value));
+ } ),
+ STRING( (message, name, value) -> {
+ message.setStringProperty(name, value);
+ } );
+
+ private final JmsPropertySetter setter;
+ JmsPropertySetterEnum(JmsPropertySetter setter) {
+ this.setter = setter;
+ }
+
+ public void setProperty(Message message, String name, String value) throws JMSException, NumberFormatException {
+ setter.setProperty(message, name, value);
+ }
+ }
+
+ /**
+ * This map helps us avoid using JmsPropertySetterEnum.valueOf and dealing with IllegalArgumentException on failed lookup.
+ */
+ public static Map<String, JmsPropertySetterEnum> propertySetterMap = new HashMap<>();
+ static {
+ Arrays.stream(JmsPropertySetterEnum.values()).forEach(e -> propertySetterMap.put(e.name().toLowerCase(), e));
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/66eeb488/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index a365ad5..fa0bd7a 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -180,4 +181,76 @@ public class PublishJMSIT {
runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
}
+
+ @Test(timeout = 10000)
+ public void validatePublishPropertyTypes() throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+ final String destinationName = "validatePublishPropertyTypes";
+ PublishJMS pubProc = new PublishJMS();
+ TestRunner runner = TestRunners.newTestRunner(pubProc);
+ JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
+ when(cs.getIdentifier()).thenReturn("cfProvider");
+ when(cs.getConnectionFactory()).thenReturn(cf);
+
+ runner.addControllerService("cfProvider", cs);
+ runner.enableControllerService(cs);
+
+ runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+ runner.setProperty(PublishJMS.DESTINATION, destinationName);
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("foo", "foo");
+ attributes.put("myboolean", "true");
+ attributes.put("myboolean.type", "boolean");
+ attributes.put("mybyte", "127");
+ attributes.put("mybyte.type", "byte");
+ attributes.put("myshort", "16384");
+ attributes.put("myshort.type", "short");
+ attributes.put("myinteger", "1544000");
+ attributes.put("myinteger.type", "INTEGER"); // test upper case
+ attributes.put("mylong", "9876543210");
+ attributes.put("mylong.type", "long");
+ attributes.put("myfloat", "3.14");
+ attributes.put("myfloat.type", "float");
+ attributes.put("mydouble", "3.14159265359");
+ attributes.put("mydouble.type", "double");
+ attributes.put("badtype", "3.14");
+ attributes.put("badtype.type", "pi"); // pi not recognized as a type, so send as String
+ attributes.put("badint", "3.14"); // value is not an integer
+ attributes.put("badint.type", "integer");
+
+ runner.enqueue("Hey dude!".getBytes(), attributes);
+ runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left intact so that we can use it.
+
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+ assertNotNull(successFF);
+
+ JmsTemplate jmst = new JmsTemplate(cf);
+ BytesMessage message = (BytesMessage) jmst.receive(destinationName);
+
+ byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
+ assertEquals("Hey dude!", new String(messageBytes));
+ assertEquals(true, message.getObjectProperty("foo") instanceof String);
+ assertEquals("foo", message.getStringProperty("foo"));
+ assertEquals(true, message.getObjectProperty("myboolean") instanceof Boolean);
+ assertEquals(true, message.getBooleanProperty("myboolean"));
+ assertEquals(true, message.getObjectProperty("mybyte") instanceof Byte);
+ assertEquals(127, message.getByteProperty("mybyte"));
+ assertEquals(true, message.getObjectProperty("myshort") instanceof Short);
+ assertEquals(16384, message.getShortProperty("myshort"));
+ assertEquals(true, message.getObjectProperty("myinteger") instanceof Integer);
+ assertEquals(1544000, message.getIntProperty("myinteger"));
+ assertEquals(true, message.getObjectProperty("mylong") instanceof Long);
+ assertEquals(9876543210L, message.getLongProperty("mylong"));
+ assertEquals(true, message.getObjectProperty("myfloat") instanceof Float);
+ assertEquals(3.14F, message.getFloatProperty("myfloat"), 0.001F);
+ assertEquals(true, message.getObjectProperty("mydouble") instanceof Double);
+ assertEquals(3.14159265359D, message.getDoubleProperty("mydouble"), 0.00000000001D);
+ assertEquals(true, message.getObjectProperty("badtype") instanceof String);
+ assertEquals("3.14", message.getStringProperty("badtype"));
+ assertFalse(message.propertyExists("badint"));
+
+ runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
+ }
}
[3/3] nifi git commit: Merge branch 'pr2949'
Posted by de...@apache.org.
Merge branch 'pr2949'
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/895323f3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/895323f3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/895323f3
Branch: refs/heads/master
Commit: 895323f3c2ab8541a663ba35ed98209c256f7c12
Parents: 813cc1f 4f538f1
Author: Brandon Devries <de...@apache.org>
Authored: Tue Oct 2 09:40:36 2018 -0400
Committer: Brandon Devries <de...@apache.org>
Committed: Tue Oct 2 09:40:36 2018 -0400
----------------------------------------------------------------------
.../nifi/jms/processors/JMSPublisher.java | 70 +++++++++++++++++--
.../apache/nifi/jms/processors/PublishJMS.java | 6 +-
.../nifi/jms/processors/PublishJMSIT.java | 73 ++++++++++++++++++++
3 files changed, 144 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[2/3] nifi git commit: NIFI-3672 updated PublishJMS message property
docs
Posted by de...@apache.org.
NIFI-3672 updated PublishJMS message property docs
This closes #2949
Signed-off-by: Brandon Devries <de...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4f538f1e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4f538f1e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4f538f1e
Branch: refs/heads/master
Commit: 4f538f1ecb8e73859e8875bff10ab18971897180
Parents: 66eeb48
Author: Mike Moser <mo...@apache.org>
Authored: Tue Aug 14 18:55:10 2018 +0000
Committer: Brandon Devries <de...@apache.org>
Committed: Tue Oct 2 09:39:17 2018 -0400
----------------------------------------------------------------------
.../main/java/org/apache/nifi/jms/processors/PublishJMS.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/4f538f1e/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index f58b9cf..3afa0f0 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -72,7 +72,11 @@ import org.springframework.jms.support.JmsHeaders;
@ReadsAttribute(attribute = JmsHeaders.TYPE, description = "This attribute becomes the JMSType message header. Must be an integer."),
@ReadsAttribute(attribute = JmsHeaders.REPLY_TO, description = "This attribute becomes the JMSReplyTo message header. Must be an integer."),
@ReadsAttribute(attribute = JmsHeaders.DESTINATION, description = "This attribute becomes the JMSDestination message header. Must be an integer."),
- @ReadsAttribute(attribute = "other attributes", description = "All other attributes that do not start with " + JmsHeaders.PREFIX + " are added as message properties.")
+ @ReadsAttribute(attribute = "other attributes", description = "All other attributes that do not start with " + JmsHeaders.PREFIX + " are added as message properties."),
+ @ReadsAttribute(attribute = "other attributes .type", description = "When an attribute will be added as a message property, a second attribute of the same name but with an extra"
+ + " `.type` at the end will cause the message property to be sent using that strong type. For example, attribute `delay` with value `12000` and another attribute"
+ + " `delay.type` with value `integer` will cause a JMS message property `delay` to be sent as an Integer rather than a String. Supported types are boolean, byte,"
+ + " short, integer, long, float, double, and string (which is the default).")
})
@SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class })
@SystemResourceConsideration(resource = SystemResource.MEMORY)