You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2022/01/18 21:46:46 UTC

[nifi] branch main updated: NIFI-7865 amqp$header is splitted in the wrong way for ", " and "}"

This is an automated email from the ASF dual-hosted git repository.

kdoran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e603b017 NIFI-7865 amqp$header is splitted in the wrong way for "," and "}"
e603b017 is described below

commit e603b0179bf46ef26bbf4713ae68553b19d4c485
Author: sedadgn <se...@abas.de>
AuthorDate: Thu Oct 14 13:24:35 2021 +0200

    NIFI-7865 amqp$header is splitted in the wrong way for "," and "}"
    
    This PR introduces 2 new properties for the ConsumeAMQP processor
    And one new property for PublishAMQP
    
    This allows to configure the processors to use escaping for commas and to consistently not use curly braces in the amqp$header attribute.
    
    The default values ensure backwards compatibility.
    
    This closes #5458.
    
    Signed-off-by: Kevin Doran <kd...@apache.org>
---
 .../nifi/processor/util/StandardValidators.java    |  20 +++
 .../apache/nifi/amqp/processors/ConsumeAMQP.java   |  54 +++++++-
 .../apache/nifi/amqp/processors/PublishAMQP.java   |  28 ++--
 .../nifi/amqp/processors/ConsumeAMQPTest.java      | 144 +++++++++++++++++++++
 .../nifi/amqp/processors/PublishAMQPTest.java      |  49 +++++++
 5 files changed, 283 insertions(+), 12 deletions(-)

diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index 1dad5d4..7c2276a 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -375,6 +375,25 @@ public class StandardValidators {
         }
     };
 
+    public static final Validator SINGLE_CHAR_VALIDATOR = (subject, input, context) -> {
+        if (input == null) {
+            return new ValidationResult.Builder()
+                    .input(input)
+                    .subject(subject)
+                    .valid(false)
+                    .explanation("Input is null for this property")
+                    .build();
+        }
+        if (input.length() != 1) {
+            return new ValidationResult.Builder()
+                    .input(input)
+                    .subject(subject)
+                    .valid(false)
+                    .explanation("Value must be exactly 1 character but was " + input.length() + " in length")
+                    .build();
+        }
+        return new ValidationResult.Builder().input(input).subject(subject).valid(true).build();
+    };
     /**
      * URL Validator that does not allow the Expression Language to be used
      */
@@ -980,4 +999,5 @@ public class StandardValidators {
             return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
         }
     }
+
 }
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index 7a1885c..734d3ef 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -98,6 +98,24 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
         .defaultValue("10")
         .required(true)
         .build();
+    public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder()
+       .name("header.separator")
+       .displayName("Header Separator")
+       .description("The character that is used to separate key-value for header in String. The value must only one character."
+               + "Otherwise you will get an error message")
+       .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
+       .defaultValue(",")
+       .required(false)
+       .build();
+    static final PropertyDescriptor REMOVE_CURLY_BRACES = new PropertyDescriptor.Builder()
+        .name("remove.curly.braces")
+        .displayName("Remove Curly Braces")
+        .description("If true Remove Curly Braces, Curly Braces in the header will be automatically remove.")
+        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+        .defaultValue("False")
+        .allowableValues("True", "False")
+        .required(false)
+        .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -112,6 +130,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
         properties.add(QUEUE);
         properties.add(AUTO_ACKNOWLEDGE);
         properties.add(BATCH_SIZE);
+        properties.add(REMOVE_CURLY_BRACES);
+        properties.add(HEADER_SEPARATOR);
         properties.addAll(getCommonPropertyDescriptors());
         propertyDescriptors = Collections.unmodifiableList(properties);
 
@@ -149,7 +169,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
 
             final BasicProperties amqpProperties = response.getProps();
             final Envelope envelope = response.getEnvelope();
-            final Map<String, String> attributes = buildAttributes(amqpProperties, envelope);
+            final Map<String, String> attributes = buildAttributes(amqpProperties, envelope, context.getProperty(REMOVE_CURLY_BRACES).asBoolean(),
+                    context.getProperty(HEADER_SEPARATOR).toString().charAt(0));
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue());
@@ -163,12 +184,12 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
         }
     }
 
-    private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope) {
+    private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope, boolean removeCurlyBraces,  Character valueSeperatorForHeaders) {
         final Map<String, String> attributes = new HashMap<>();
         addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", properties.getContentType());
-        addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", properties.getHeaders());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", buildHeaders(properties.getHeaders(), removeCurlyBraces,valueSeperatorForHeaders));
         addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", properties.getDeliveryMode());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", properties.getPriority());
         addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", properties.getCorrelationId());
@@ -192,6 +213,33 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
         attributes.put(attributeName, value.toString());
     }
 
+    private String buildHeaders(Map<String, Object> headers,  boolean removeCurlyBraces,Character valueSeparatorForHeaders) {
+        if (headers == null) {
+            return null;
+        }
+        String headerString = convertMapToString(headers,valueSeparatorForHeaders);
+
+        if (!removeCurlyBraces) {
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append("{").append(headerString).append("}");
+            headerString = stringBuilder.toString();
+        }
+        return headerString;
+    }
+
+    public static String convertMapToString(Map<String, Object> headers,Character valueSeparatorForHeaders) {
+        StringBuilder stringBuilder = new StringBuilder();
+        boolean notFirst = false;
+        for (Map.Entry<String, Object> entry : headers.entrySet()) {
+            if (notFirst) {
+                stringBuilder.append(valueSeparatorForHeaders);
+            }
+            stringBuilder.append(entry.getKey()).append("=").append(entry.getValue().toString());
+            notFirst = true;
+        }
+        return stringBuilder.toString();
+    }
+
     @Override
     protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) {
         try {
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
index 1520d1b..4bd94f3 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Consumer;
+import java.util.regex.Pattern;
 
 import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -98,7 +99,15 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
-
+    public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder()
+            .name("header.separator")
+            .displayName("Header Separator")
+            .description("The character that is used to split key-value for headers. The value must only one character. "
+                    + "Otherwise you will get an error message")
+            .defaultValue(",")
+            .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
+            .required(false)
+            .build();
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles that are sent to the AMQP destination are routed to this relationship")
@@ -116,6 +125,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
         List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(EXCHANGE);
         properties.add(ROUTING_KEY);
+        properties.add(HEADER_SEPARATOR);
         properties.addAll(getCommonPropertyDescriptors());
         propertyDescriptors = Collections.unmodifiableList(properties);
 
@@ -143,7 +153,8 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
             return;
         }
 
-        final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile);
+        final BasicProperties amqpProperties = extractAmqpPropertiesFromFlowFile(flowFile,
+                context.getProperty(HEADER_SEPARATOR).toString().charAt(0));
         final String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
         if (routingKey == null) {
             throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
@@ -224,7 +235,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
      * {@link AMQPUtils#validateAMQPPriorityProperty}
      * {@link AMQPUtils#validateAMQPTimestampProperty}
      */
-    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile) {
+    private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile flowFile,Character headerSeparator) {
         final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
 
         updateBuilderFromAttribute(flowFile, "contentType", builder::contentType);
@@ -240,20 +251,20 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
         updateBuilderFromAttribute(flowFile, "userId", builder::userId);
         updateBuilderFromAttribute(flowFile, "appId", builder::appId);
         updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
-        updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers)));
+        updateBuilderFromAttribute(flowFile, "headers", headers -> builder.headers(validateAMQPHeaderProperty(headers,headerSeparator)));
 
         return builder.build();
     }
 
     /**
      * Will validate if provided amqpPropValue can be converted to a {@link Map}.
-     * Should be passed in the format: amqp$headers=key=value,key=value etc.
-     *
+     * Should be passed in the format: amqp$headers=key=value
+     * @param splitValue is used to split for property
      * @param amqpPropValue the value of the property
      * @return {@link Map} if valid otherwise null
      */
-    private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue) {
-        String[] strEntries = amqpPropValue.split(",");
+    private Map<String, Object> validateAMQPHeaderProperty(String amqpPropValue,Character splitValue) {
+        String[] strEntries = amqpPropValue.split(Pattern.quote(String.valueOf(splitValue)));
         Map<String, Object> headers = new HashMap<>();
         for (String strEntry : strEntries) {
             String[] kv = strEntry.split("=");
@@ -263,7 +274,6 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
                 getLogger().warn("Malformed key value pair for AMQP header property: " + amqpPropValue);
             }
         }
-
         return headers;
     }
 }
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 125593d..3704c9e 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -24,10 +24,12 @@ import static org.mockito.Mockito.mock;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
@@ -35,8 +37,10 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
 import org.junit.Test;
 
+import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.MessageProperties;
 
@@ -161,6 +165,146 @@ public class ConsumeAMQPTest {
         }
     }
 
+    @Test
+    public void validateHeaderWithValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() throws Exception {
+        final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("foo1","bar,bar");
+        headersMap.put("foo2","bar,bar");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|");
+            runner.run();
+            final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = convertStringToMap(headers.substring(1,headers.length()-1),"|");
+            Assert.assertEquals(headersMap,properties);
+        }
+    }
+    @Test
+    public void validateWithNotValidHeaderSeparatorParameter() {
+        final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
+        final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
+        ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+        TestRunner runner = initTestRunner(proc);
+        runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR, "|,");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void validateHeaderWithRemoveCurlyBracesParameterConsumeAndTransferToSuccess() throws Exception {
+        final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","(bar,bar)");
+        AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True");
+            runner.run();
+            final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            successFF.assertAttributeEquals("amqp$headers", "key1=(bar,bar)");
+
+        }
+    }
+
+    @Test
+    public void validateHeaderWithRemoveCurlyBracesAndValueSeparatorForHeaderParameterConsumeAndTransferToSuccess() throws Exception {
+        final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","(bar,bar)");
+        headersMap.put("key2","(bar,bar)");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.REMOVE_CURLY_BRACES,"True");
+            runner.setProperty(ConsumeAMQP.HEADER_SEPARATOR,"|");
+
+            runner.run();
+            final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = convertStringToMap(headers,"|");
+            Assert.assertEquals(headersMap,properties);
+        }
+    }
+
+    @Test
+    public void validateHeaderWithoutParameterConsumeAndTransferToSuccess() throws Exception {
+        final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
+        final Map<String, Object> headersMap = new HashMap<>();
+        headersMap.put("key1","bar");
+        headersMap.put("key2","bar2");
+
+        AMQP.BasicProperties.Builder builderBasicProperties = new AMQP.BasicProperties.Builder();
+        builderBasicProperties.headers(headersMap);
+
+        final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), builderBasicProperties.build(), "key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = initTestRunner(proc);
+
+            runner.run();
+            final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeEquals("amqp$routingKey", "key1");
+            successFF.assertAttributeEquals("amqp$exchange", "myExchange");
+            String headers = successFF.getAttribute("amqp$headers");
+            Map<String, String> properties = convertStringToMap(headers.substring(1,headers.length()-1),",");
+            Assert.assertEquals(headersMap,properties);
+        }
+    }
+
+
+    private Map<String,String> convertStringToMap(String map,String splitCharacter){
+        Map<String, String> headers = new HashMap<>();
+        String[] pairs = map.split(Pattern.quote(String.valueOf(splitCharacter)));
+        for (String pair : pairs) {
+            String[] keyValue = pair.split("=", 2);
+            Assert.assertEquals(2,keyValue.length);
+            headers.put(keyValue[0].trim(), keyValue[1].trim());
+        }
+        return headers;
+    }
     private TestRunner initTestRunner(ConsumeAMQP proc) {
         TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672");
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
index 345372a..3d7e8f7 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
@@ -107,6 +107,55 @@ public class PublishAMQPTest {
     }
 
     @Test
+    public void validateSuccessWithHeaderWithCommaPublishToSuccess() throws Exception {
+        final PublishAMQP pubProc = new LocalPublishAMQP();
+        final TestRunner runner = TestRunners.newTestRunner(pubProc);
+        runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
+        runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
+        runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
+        runner.setProperty(PublishAMQP.USER, "user");
+        runner.setProperty(PublishAMQP.PASSWORD, "password");
+        runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|");
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        attributes.put("amqp$headers", "foo=(bar,bar)|foo2=bar2|foo3");
+
+        runner.enqueue("Hello Joe".getBytes(), attributes);
+
+        runner.run();
+
+        final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+
+        final Channel channel = ((LocalPublishAMQP) pubProc).getConnection().createChannel();
+        final GetResponse msg1 = channel.basicGet("queue1", true);
+        assertNotNull(msg1);
+
+        final Map<String, Object> headerMap = msg1.getProps().getHeaders();
+
+        final Object foo = headerMap.get("foo");
+        final Object foo2 = headerMap.get("foo2");
+        final Object foo3 = headerMap.get("foo3");
+
+        assertEquals("(bar,bar)", foo.toString());
+        assertEquals("bar2", foo2.toString());
+        assertNull(foo3);
+
+
+        assertNotNull(channel.basicGet("queue2", true));
+    }
+
+    @Test
+    public void validateWithNotValidHeaderSeparatorParameter()  {
+        final PublishAMQP pubProc = new LocalPublishAMQP();
+        final TestRunner runner = TestRunners.newTestRunner(pubProc);
+        runner.setProperty(PublishAMQP.HEADER_SEPARATOR,"|,");
+        runner.assertNotValid();
+
+    }
+
+    @Test
     public void validateFailedPublishAndTransferToFailure() throws Exception {
         PublishAMQP pubProc = new LocalPublishAMQP();
         TestRunner runner = TestRunners.newTestRunner(pubProc);