You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/04/04 16:08:25 UTC

nifi git commit: NIFI-1686 - NiFi is unable to populate over 1/4 of AMQP properties from flow properties This closes #305 [Forced Update!]

Repository: nifi
Updated Branches:
  refs/heads/master b531b97a4 -> e02c79975 (forced update)


NIFI-1686 - NiFi is unable to populate over 1/4 of AMQP properties from flow properties
This closes #305


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e02c7997
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e02c7997
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e02c7997

Branch: refs/heads/master
Commit: e02c79975ed8db69e63d96a28e81db08bc869e54
Parents: 25290ce
Author: Stephen Harper <st...@gmail.com>
Authored: Sat Mar 26 20:06:50 2016 +0000
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Apr 4 10:07:50 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/amqp/processors/AMQPUtils.java  | 169 +++++++++++++++++--
 .../nifi/amqp/processors/PublishAMQP.java       |  81 ++++++---
 .../nifi/amqp/processors/PublishAMQPTest.java   |  41 +++++
 3 files changed, 255 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e02c7997/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java
index 618965a..6cfa2c7 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java
@@ -18,9 +18,8 @@ package org.apache.nifi.amqp.processors;
 
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
-import java.util.Arrays;
+import java.util.Date;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.nifi.flowfile.FlowFile;
@@ -43,16 +42,48 @@ abstract class AMQPUtils {
 
     private final static Logger logger = LoggerFactory.getLogger(AMQPUtils.class);
 
-    private final static List<String> propertyNames = Arrays.asList("amqp$contentType", "amqp$contentEncoding",
-            "amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo",
-            "amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId",
-            "amqp$clusterId");
-    /**
-     * Returns a {@link List} of AMQP property names defined in
-     * {@link BasicProperties}
-     */
-    public static List<String> getAmqpPropertyNames() {
-        return propertyNames;
+    public enum PropertyNames {
+        CONTENT_TYPE (AMQP_PROP_PREFIX + "contentType"),
+        CONTENT_ENCODING (AMQP_PROP_PREFIX + "contentEncoding"),
+        HEADERS (AMQP_PROP_PREFIX + "headers"),
+        DELIVERY_MODE (AMQP_PROP_PREFIX + "deliveryMode"),
+        PRIORITY (AMQP_PROP_PREFIX + "priority"),
+        CORRELATION_ID (AMQP_PROP_PREFIX + "correlationId"),
+        REPLY_TO (AMQP_PROP_PREFIX + "replyTo"),
+        EXPIRATION (AMQP_PROP_PREFIX + "expiration"),
+        MESSAGE_ID (AMQP_PROP_PREFIX + "messageId"),
+        TIMESTAMP (AMQP_PROP_PREFIX + "timestamp"),
+        TYPE (AMQP_PROP_PREFIX + "type"),
+        USER_ID (AMQP_PROP_PREFIX + "userId"),
+        APP_ID (AMQP_PROP_PREFIX + "appId"),
+        CLUSTER_ID (AMQP_PROP_PREFIX + "clusterId");
+
+        PropertyNames(String value) {
+            this.value = value;
+        }
+
+        private final String value;
+
+        private static final Map<String, PropertyNames> lookup = new HashMap<>();
+
+        public static PropertyNames fromValue(String s) {
+            return lookup.get(s);
+        }
+
+        static {
+            for(PropertyNames propertyNames : PropertyNames.values()) {
+                lookup.put(propertyNames.getValue(), propertyNames);
+            }
+        }
+
+        public String getValue() {
+            return value;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
     }
 
     /**
@@ -69,14 +100,14 @@ abstract class AMQPUtils {
         if (amqpProperties != null){
             try {
                 Method[] methods = BasicProperties.class.getDeclaredMethods();
-                Map<String, String> attributes = new HashMap<String, String>();
+                Map<String, String> attributes = new HashMap<>();
                 for (Method method : methods) {
                     if (Modifier.isPublic(method.getModifiers()) && method.getName().startsWith("get")) {
                         Object amqpPropertyValue = method.invoke(amqpProperties);
                         if (amqpPropertyValue != null) {
                             String propertyName = extractPropertyNameFromMethod(method);
                             if (isValidAmqpPropertyName(propertyName)) {
-                                if (propertyName.equals(AMQP_PROP_PREFIX + "contentType")) {
+                                if (propertyName.equals(PropertyNames.CONTENT_TYPE.getValue())) {
                                     attributes.put(CoreAttributes.MIME_TYPE.key(), amqpPropertyValue.toString());
                                 }
                                 attributes.put(propertyName, amqpPropertyValue.toString());
@@ -95,14 +126,12 @@ abstract class AMQPUtils {
     /**
      * Will validate if provided name corresponds to valid AMQP property.
      *
-     * @see AMQPUtils#getAmqpPropertyNames()
-     *
      * @param name
      *            the name of the property
      * @return 'true' if valid otherwise 'false'
      */
     public static boolean isValidAmqpPropertyName(String name) {
-        return propertyNames.contains(name);
+        return PropertyNames.fromValue(name) != null;
     }
 
     /**
@@ -113,4 +142,110 @@ abstract class AMQPUtils {
         c[0] = Character.toLowerCase(c[0]);
         return AMQP_PROP_PREFIX + new String(c);
     }
+
+    /**
+     * Will validate if provided amqpPropValue can be converted to a {@link Map}.
+     * Should be passed in the format: amqp$headers=key=value,key=value etc.
+     *
+     * @param amqpPropValue
+     *            the value of the property
+     * @return {@link Map} if valid otherwise null
+     */
+    public static Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue){
+        String[] strEntries = amqpPropValue.split(",");
+        Map<String, Object> headers = new HashMap<>();
+        for (String strEntry : strEntries) {
+            String[] kv = strEntry.split("=");
+            if (kv.length == 2) {
+                headers.put(kv[0].trim(), kv[1].trim());
+            } else {
+                logger.warn("Malformed key value pair for AMQP header property: " + amqpPropValue);
+            }
+        }
+
+        return headers;
+    }
+
+    /**
+     * Will validate if provided amqpPropValue can be converted to an {@link Integer}, and that its
+     * value is 1 or 2.
+     *
+     * @param amqpPropValue
+     *            the value of the property
+     * @return {@link Integer} if valid otherwise null
+     */
+    public static Integer validateAMQPDeliveryModeProperty(String amqpPropValue){
+        Integer deliveryMode = toInt(amqpPropValue);
+
+        if (deliveryMode == null || !(deliveryMode == 1 || deliveryMode == 2)) {
+            logger.warn("Invalid value for AMQP deliveryMode property: " + amqpPropValue);
+        }
+        return deliveryMode;
+    }
+
+    /**
+     * Will validate if provided amqpPropValue can be converted to an {@link Integer} and that its
+     * value is between 0 and 9 (inclusive).
+     *
+     * @param amqpPropValue
+     *            the value of the property
+     * @return {@link Integer} if valid otherwise null
+     */
+    public static Integer validateAMQPPriorityProperty(String amqpPropValue){
+        Integer priority = toInt(amqpPropValue);
+
+        if (priority == null || !(priority >= 0 && priority <= 9)){
+            logger.warn("Invalid value for AMQP priority property: " + amqpPropValue);
+        }
+        return priority;
+    }
+
+    /**
+     * Will validate if provided amqpPropValue can be converted to a {@link Date}.
+     *
+     * @param amqpPropValue
+     *            the value of the property
+     * @return {@link Date} if valid otherwise null
+     */
+    public static Date validateAMQPTimestampProperty(String amqpPropValue){
+        Long timestamp = toLong(amqpPropValue);
+
+        if (timestamp == null){
+            logger.warn("Invalid value for AMQP timestamp property: " + amqpPropValue);
+            return null;
+        }
+
+        //milliseconds are lost when sending to AMQP
+        return new Date(timestamp);
+    }
+
+    /**
+     * Takes a {@link String} and tries to convert to an {@link Integer}.
+     *
+     * @param strVal
+     *            the value to be converted
+     * @return {@link Integer} if valid otherwise null
+     */
+    private static Integer toInt(String strVal){
+        try {
+            return Integer.parseInt(strVal);
+        } catch (NumberFormatException aE){
+            return null;
+        }
+    }
+
+    /**
+     * Takes a {@link String} and tries to convert to a {@link Long}.
+     *
+     * @param strVal
+     *            the value to be converted
+     * @return {@link Long} if valid otherwise null
+     */
+    private static Long toLong(String strVal){
+        try {
+            return Long.parseLong(strVal);
+        } catch (NumberFormatException aE){
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e02c7997/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
index b1e442e..85116c2 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
@@ -18,14 +18,12 @@ package org.apache.nifi.amqp.processors;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Properties;
 import java.util.Set;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -97,8 +95,6 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
 
     private final static Set<Relationship> relationships;
 
-    private final static List<String> amqpPropertyNames = AMQPUtils.getAmqpPropertyNames();
-
     /*
      * Will ensure that the list of property descriptors is build only once.
      * Will also create a Set of relationships
@@ -118,11 +114,11 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
 
     /**
      * Will construct AMQP message by extracting its body from the incoming
-     * {@link FlowFile}. AMQP {@link Properties} will be extracted from the
+     * {@link FlowFile}. AMQP Properties will be extracted from the
      * {@link FlowFile} and converted to {@link BasicProperties} to be sent
      * along with the message. Upon success the incoming {@link FlowFile} is
-     * transfered to 'success' {@link Relationship} and upon failure FlowFile is
-     * penalized and transfered to the 'failure' {@link Relationship}
+     * transferred to 'success' {@link Relationship} and upon failure FlowFile is
+     * penalized and transferred to the 'failure' {@link Relationship}
      * <br>
      * NOTE: Attributes extracted from {@link FlowFile} are considered
      * candidates for AMQP properties if their names are prefixed with
@@ -195,26 +191,73 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
      * Extracts AMQP properties from the {@link FlowFile} attributes. Attributes
      * extracted from {@link FlowFile} are considered candidates for AMQP
      * properties if their names are prefixed with
-     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
+     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml).
+     *
+     * Some fields require a specific format and are validated:
+     *
+     * {@link AMQPUtils#validateAMQPHeaderProperty}
+     * {@link AMQPUtils#validateAMQPDeliveryModeProperty}
+     * {@link AMQPUtils#validateAMQPPriorityProperty}
+     * {@link AMQPUtils#validateAMQPTimestampProperty}
      */
     private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) {
         Map<String, String> attributes = flowFile.getAttributes();
         AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
         for (Entry<String, String> attributeEntry : attributes.entrySet()) {
             if (attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
-                String amqpPropName = attributeEntry.getKey().split("\\" + AMQPUtils.AMQP_PROP_DELIMITER)[1];
+                String amqpPropName = attributeEntry.getKey();
                 String amqpPropValue = attributeEntry.getValue();
-                try {
-                    if (amqpPropertyNames.contains(AMQPUtils.AMQP_PROP_PREFIX + amqpPropName)) {
-                        Method m = builder.getClass().getDeclaredMethod(amqpPropName, String.class);
-                        m.invoke(builder, amqpPropValue);
-                    } else {
-                        getLogger().warn("Unrecogninsed AMQP property '" + amqpPropName + "', will ignore.");
+
+                AMQPUtils.PropertyNames propertyNames = AMQPUtils.PropertyNames.fromValue(amqpPropName);
+
+                if (propertyNames != null) {
+                    switch (propertyNames){
+                        case CONTENT_TYPE:
+                            builder.contentType(amqpPropValue);
+                            break;
+                        case CONTENT_ENCODING:
+                            builder.contentEncoding(amqpPropValue);
+                            break;
+                        case HEADERS:
+                            builder.headers(AMQPUtils.validateAMQPHeaderProperty(amqpPropValue));
+                            break;
+                        case DELIVERY_MODE:
+                            builder.deliveryMode(AMQPUtils.validateAMQPDeliveryModeProperty(amqpPropValue));
+                            break;
+                        case PRIORITY:
+                            builder.priority(AMQPUtils.validateAMQPPriorityProperty(amqpPropValue));
+                            break;
+                        case CORRELATION_ID:
+                            builder.correlationId(amqpPropValue);
+                            break;
+                        case REPLY_TO:
+                            builder.replyTo(amqpPropValue);
+                            break;
+                        case EXPIRATION:
+                            builder.expiration(amqpPropValue);
+                            break;
+                        case MESSAGE_ID:
+                            builder.messageId(amqpPropValue);
+                            break;
+                        case TIMESTAMP:
+                            builder.timestamp(AMQPUtils.validateAMQPTimestampProperty(amqpPropValue));
+                            break;
+                        case TYPE:
+                            builder.type(amqpPropValue);
+                            break;
+                        case USER_ID:
+                            builder.userId(amqpPropValue);
+                            break;
+                        case APP_ID:
+                            builder.appId(amqpPropValue);
+                            break;
+                        case CLUSTER_ID:
+                            builder.clusterId(amqpPropValue);
+                            break;
                     }
-                } catch (Exception e) {
-                    // should really never happen since it should be caught by
-                    // the above IF.
-                    getLogger().warn("Failed while trying to build AMQP Properties.", e);
+
+                } else {
+                    getLogger().warn("Unrecognised AMQP property '" + amqpPropName + "', will ignore.");
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e02c7997/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
index 3a9b8d1..fec3d50 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
@@ -18,9 +18,11 @@ package org.apache.nifi.amqp.processors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +53,20 @@ public class PublishAMQPTest {
         Map<String, String> attributes = new HashMap<>();
         attributes.put("foo", "bar");
         attributes.put("amqp$contentType", "foo/bar");
+        attributes.put("amqp$contentEncoding", "foobar123");
+        attributes.put("amqp$headers", "foo=bar,foo2=bar2,foo3");
+        attributes.put("amqp$deliveryMode", "1");
+        attributes.put("amqp$priority", "2");
+        attributes.put("amqp$correlationId", "correlationId123");
+        attributes.put("amqp$replyTo", "replyTo123");
+        attributes.put("amqp$expiration", "expiration123");
+        attributes.put("amqp$messageId", "messageId123");
+        attributes.put("amqp$timestamp", "123456789");
+        attributes.put("amqp$type", "type123");
+        attributes.put("amqp$userId", "userId123");
+        attributes.put("amqp$appId", "appId123");
+        attributes.put("amqp$clusterId", "clusterId123");
+
         runner.enqueue("Hello Joe".getBytes(), attributes);
 
         runner.run();
@@ -60,6 +76,31 @@ public class PublishAMQPTest {
         GetResponse msg1 = channel.basicGet("queue1", true);
         assertNotNull(msg1);
         assertEquals("foo/bar", msg1.getProps().getContentType());
+
+        assertEquals("foobar123", msg1.getProps().getContentEncoding());
+
+        Map<String, Object> headerMap = msg1.getProps().getHeaders();
+
+        Object foo = headerMap.get("foo");
+        Object foo2 = headerMap.get("foo2");
+        Object foo3 = headerMap.get("foo3");
+
+        assertEquals("bar", foo.toString());
+        assertEquals("bar2", foo2.toString());
+        assertNull(foo3);
+
+        assertEquals((Integer) 1, msg1.getProps().getDeliveryMode());
+        assertEquals((Integer) 2, msg1.getProps().getPriority());
+        assertEquals("correlationId123", msg1.getProps().getCorrelationId());
+        assertEquals("replyTo123", msg1.getProps().getReplyTo());
+        assertEquals("expiration123", msg1.getProps().getExpiration());
+        assertEquals("messageId123", msg1.getProps().getMessageId());
+        assertEquals(new Date(123456789L), msg1.getProps().getTimestamp());
+        assertEquals("type123", msg1.getProps().getType());
+        assertEquals("userId123", msg1.getProps().getUserId());
+        assertEquals("appId123", msg1.getProps().getAppId());
+        assertEquals("clusterId123", msg1.getProps().getClusterId());
+
         assertNotNull(channel.basicGet("queue2", true));
     }