You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/11/30 19:53:54 UTC

[nifi] 16/22: NIFI-10785 Allow publishing AMQP message with null header value NIFI-10785 addressing review comment NIFI-10785 addressing review comments (remove unnecessary property to ignore null headers)

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.19
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 71f41b96baf602859c2d1ed9383ff1fd13268719
Author: Nandor Soma Abonyi <ab...@gmail.com>
AuthorDate: Thu Nov 10 23:45:52 2022 +0100

    NIFI-10785 Allow publishing AMQP message with null header value
    NIFI-10785 addressing review comment
    NIFI-10785 addressing review comments (remove unnecessary property to ignore null headers)
    
    Signed-off-by: Nathan Gough <th...@gmail.com>
    
    This closes #6649.
---
 .../apache/nifi/amqp/processors/PublishAMQP.java   |  78 +++++++--------
 .../nifi/amqp/processors/PublishAMQPTest.java      | 111 +++++++++++++--------
 2 files changed, 103 insertions(+), 86 deletions(-)

diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
index 4bd94f3a41..6a8c7ac645 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
@@ -16,25 +16,15 @@
  */
 package org.apache.nifi.amqp.processors;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Consumer;
-import java.util.regex.Pattern;
-
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Connection;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SystemResource;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -45,13 +35,19 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.StreamUtils;
 
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.AMQP.BasicProperties;
-import com.rabbitmq.client.Connection;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.regex.Pattern;
 
 @Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
 @InputRequirement(Requirement.INPUT_REQUIRED)
@@ -89,6 +85,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(Validator.VALID)
             .build();
+
     public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder()
             .name("Routing Key")
             .description("The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). "
@@ -99,6 +96,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
+
     public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder()
             .name("header.separator")
             .displayName("Header Separator")
@@ -108,10 +106,12 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
             .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
             .required(false)
             .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles that are sent to the AMQP destination are routed to this relationship")
             .build();
+
     public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
             .description("All FlowFiles that cannot be routed to the AMQP destination are routed to this relationship")
@@ -144,7 +144,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
      *
      * NOTE: Attributes extracted from {@link FlowFile} are considered
      * candidates for AMQP properties if their names are prefixed with
-     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
+     * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml)
      */
     @Override
     protected void processResource(final Connection connection, final AMQPPublisher publisher, ProcessContext context, ProcessSession session) throws ProcessException {
@@ -153,14 +153,16 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
             return;
         }
 
-        final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile,
-                context.getProperty(HEADER_SEPARATOR).toString().charAt(0));
         final String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
         if (routingKey == null) {
             throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
                 + context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
         }
 
+        final Character separator = context.getProperty(HEADER_SEPARATOR).toString().charAt(0);
+
+        final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile, separator);
+
         final String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
         final byte[] messageContent = extractMessage(flowFile, session);
 
@@ -199,12 +201,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
      */
     private byte[] extractMessage(FlowFile flowFile, ProcessSession session){
         final byte[] messageContent = new byte[(int) flowFile.getSize()];
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                StreamUtils.fillBuffer(in, messageContent, true);
-            }
-        });
+        session.read(flowFile, in -> StreamUtils.fillBuffer(in, messageContent, true));
         return messageContent;
     }
 
@@ -226,16 +223,9 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
      * Extracts AMQP properties from the {@link FlowFile} attributes. Attributes
      * extracted from {@link FlowFile} are considered candidates for AMQP
      * properties if their names are prefixed with
-     * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml).
-     *
-     * Some fields require a specific format and are validated:
-     *
-     * {@link AMQPUtils#validateAMQPHeaderProperty}
-     * {@link AMQPUtils#validateAMQPDeliveryModeProperty}
-     * {@link AMQPUtils#validateAMQPPriorityProperty}
-     * {@link AMQPUtils#validateAMQPTimestampProperty}
+     * {@link PublishAMQP#ATTRIBUTES_PREFIX} (e.g., amqp$contentType=text/xml).
      */
-    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile,Character headerSeparator) {
+    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile, Character headerSeparator) {
         final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
 
         updateBuilderFromAttribute(flowFile, "contentType", builder::contentType);
@@ -251,7 +241,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
         updateBuilderFromAttribute(flowFile, "userId", builder::userId);
         updateBuilderFromAttribute(flowFile, "appId", builder::appId);
         updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
-        updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers,headerSeparator)));
+        updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers, headerSeparator)));
 
         return builder.build();
     }
@@ -263,15 +253,17 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
      * @param amqpPropValue the value of the property
      * @return {@link Map} if valid otherwise null
      */
-    private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue,Character splitValue) {
-        String[] strEntries = amqpPropValue.split(Pattern.quote(String.valueOf(splitValue)));
-        Map<String, Object> headers = new HashMap<>();
+    private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue, Character splitValue) {
+        final String[] strEntries = amqpPropValue.split(Pattern.quote(String.valueOf(splitValue)));
+        final Map<String, Object> headers = new HashMap<>();
         for (String strEntry : strEntries) {
-            String[] kv = strEntry.split("=");
+            final String[] kv = strEntry.split("=", -1); // without using limit, trailing delimiter would be ignored
             if (kv.length == 2) {
                 headers.put(kv[0].trim(), kv[1].trim());
+            } else if (kv.length == 1) {
+                headers.put(kv[0].trim(), null);
             } else {
-                getLogger().warn("Malformed key value pair for AMQP header property: " + amqpPropValue);
+                getLogger().warn(String.format("Malformed key value pair in AMQP header property (%s): %s", amqpPropValue, strEntry));
             }
         }
         return headers;
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
index 664862576f..72866ef4f6 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
@@ -16,6 +16,15 @@
  */
 package org.apache.nifi.amqp.processors;
 
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.GetResponse;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
@@ -24,19 +33,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.GetResponse;
-import org.junit.jupiter.api.Test;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class PublishAMQPTest {
@@ -45,11 +43,13 @@ public class PublishAMQPTest {
     public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
         final PublishAMQP pubProc = new LocalPublishAMQP();
         final TestRunner runner = TestRunners.newTestRunner(pubProc);
-        runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
-        runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
-        runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
-        runner.setProperty(PublishAMQP.USER, "user");
-        runner.setProperty(PublishAMQP.PASSWORD, "password");
+        setConnectionProperties(runner);
+
+        final Map<String, String> expectedHeaders = new HashMap<String, String>() {{
+            put("foo", "bar");
+            put("foo2", "bar2");
+            put("foo3", null);
+        }};
 
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("foo", "bar");
@@ -83,13 +83,7 @@ public class PublishAMQPTest {
 
         final Map<String, Object> headerMap = msg1.getProps().getHeaders();
 
-        final Object foo = headerMap.get("foo");
-        final Object foo2 = headerMap.get("foo2");
-        final Object foo3 = headerMap.get("foo3");
-
-        assertEquals("bar", foo.toString());
-        assertEquals("bar2", foo2.toString());
-        assertNull(foo3);
+        assertEquals(expectedHeaders, headerMap);
 
         assertEquals((Integer) 1, msg1.getProps().getDeliveryMode());
         assertEquals((Integer) 2, msg1.getProps().getPriority());
@@ -110,15 +104,16 @@ public class PublishAMQPTest {
     public void validateSuccessWithHeaderWithCommaPublishToSuccess() throws Exception {
         final PublishAMQP pubProc = new LocalPublishAMQP();
         final TestRunner runner = TestRunners.newTestRunner(pubProc);
-        runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
-        runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
-        runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
-        runner.setProperty(PublishAMQP.USER, "user");
-        runner.setProperty(PublishAMQP.PASSWORD, "password");
+        setConnectionProperties(runner);
         runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|");
 
-        final Map<String, String> attributes = new HashMap<>();
+        final Map<String, String> expectedHeaders = new HashMap<String, String>() {{
+            put("foo", "(bar,bar)");
+            put("foo2", "bar2");
+            put("foo3", null);
+        }};
 
+        final Map<String, String> attributes = new HashMap<>();
         attributes.put("amqp$headers", "foo=(bar,bar)|foo2=bar2|foo3");
 
         runner.enqueue("Hello Joe".getBytes(), attributes);
@@ -134,14 +129,7 @@ public class PublishAMQPTest {
 
         final Map<String, Object> headerMap = msg1.getProps().getHeaders();
 
-        final Object foo = headerMap.get("foo");
-        final Object foo2 = headerMap.get("foo2");
-        final Object foo3 = headerMap.get("foo3");
-
-        assertEquals("(bar,bar)", foo.toString());
-        assertEquals("bar2", foo2.toString());
-        assertNull(foo3);
-
+        assertEquals(expectedHeaders, headerMap);
 
         assertNotNull(channel.basicGet("queue2", true));
     }
@@ -152,18 +140,48 @@ public class PublishAMQPTest {
         final TestRunner runner = TestRunners.newTestRunner(pubProc);
         runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|,");
         runner.assertNotValid();
+    }
+
+    @Test
+    public void validateMalformedHeaderIgnoredAndPublishToSuccess() throws Exception {
+        final PublishAMQP pubProc = new LocalPublishAMQP();
+        final TestRunner runner = TestRunners.newTestRunner(pubProc);
+        setConnectionProperties(runner);
+        runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|");
+
+        final Map<String, String> expectedHeaders = new HashMap<String, String>() {{
+            put("foo", "(bar,bar)");
+            put("foo2", "bar2");
+            put("foo3", null);
+        }};
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("amqp$headers", "foo=(bar,bar)|foo2=bar2|foo3|foo4=malformed=|foo5=mal=formed");
+
+        runner.enqueue("Hello Joe".getBytes(), attributes);
+
+        runner.run();
+
+        final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+
+        final Channel channel = ((LocalPublishAMQP) pubProc).getConnection().createChannel();
+        final GetResponse msg1 = channel.basicGet("queue1", true);
+        assertNotNull(msg1);
+
+        final Map<String, Object> headerMap = msg1.getProps().getHeaders();
 
+        assertEquals(expectedHeaders, headerMap);
+
+        assertNotNull(channel.basicGet("queue2", true));
     }
 
     @Test
-    public void validateFailedPublishAndTransferToFailure() throws Exception {
+    public void validateFailedPublishAndTransferToFailure() {
         PublishAMQP pubProc = new LocalPublishAMQP();
         TestRunner runner = TestRunners.newTestRunner(pubProc);
-        runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
-        runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone");
-        runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
-        runner.setProperty(PublishAMQP.USER, "user");
-        runner.setProperty(PublishAMQP.PASSWORD, "password");
+        setConnectionProperties(runner);
+        runner.setProperty(PublishAMQP.EXCHANGE, "nonExistentExchange");
 
         runner.enqueue("Hello Joe".getBytes());
 
@@ -173,6 +191,13 @@ public class PublishAMQPTest {
         assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).get(0));
     }
 
+    private void setConnectionProperties(TestRunner runner) {
+        runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
+        runner.setProperty(PublishAMQP.USER, "user");
+        runner.setProperty(PublishAMQP.PASSWORD, "password");
+        runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
+        runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
+    }
 
     public static class LocalPublishAMQP extends PublishAMQP {
         private TestConnection connection;