You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/02/08 17:58:14 UTC

nifi git commit: NIFI-1423 Allow to penalize FlowFiles that are routed to No Retry relationship

Repository: nifi
Updated Branches:
  refs/heads/master 2673370cb -> eb6f9f0fe


NIFI-1423 Allow to penalize FlowFiles that are routed to No Retry relationship

This closes #183

Signed-off-by: jpercivall <jo...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/eb6f9f0f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/eb6f9f0f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/eb6f9f0f

Branch: refs/heads/master
Commit: eb6f9f0fec3042350864afee17a67ea62ef5f37a
Parents: 2673370
Author: ledor473 <le...@gmail.com>
Authored: Thu Jan 21 17:15:34 2016 -0500
Committer: jpercivall <jo...@yahoo.com>
Committed: Mon Feb 8 11:20:08 2016 -0500

----------------------------------------------------------------------
 .../apache/nifi/util/MockProcessSession.java    | 34 +++++++++--
 .../nifi/util/StandardProcessorTestRunner.java  | 22 +++++++
 .../java/org/apache/nifi/util/TestRunner.java   | 15 +++++
 .../nifi/processors/standard/InvokeHTTP.java    | 14 ++++-
 .../processors/standard/TestInvokeHTTP.java     |  2 +
 .../standard/util/TestInvokeHttpCommon.java     | 62 +++++++++++++++++++-
 6 files changed, 141 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 67cf16e..ea45dbf 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -61,6 +61,7 @@ public class MockProcessSession implements ProcessSession {
     private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>();
     private final MockFlowFileQueue processorQueue;
     private final Set<Long> beingProcessed = new HashSet<>();
+    private final List<MockFlowFile> penalized = new ArrayList<>();
 
     private final Map<Long, MockFlowFile> currentVersions = new HashMap<>();
     private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
@@ -429,11 +430,22 @@ public class MockProcessSession implements ProcessSession {
     @Override
     public void remove(final FlowFile flowFile) {
         validateState(flowFile);
-        final Iterator<Long> itr = beingProcessed.iterator();
-        while (itr.hasNext()) {
-            final Long ffId = itr.next();
+
+        final Iterator<MockFlowFile> penalizedItr = penalized.iterator();
+        while (penalizedItr.hasNext()) {
+            final MockFlowFile ff = penalizedItr.next();
+            if (Objects.equals(ff.getId(), flowFile.getId())) {
+                penalizedItr.remove();
+                penalized.remove(ff);
+                break;
+            }
+        }
+
+        final Iterator<Long> processedItr = beingProcessed.iterator();
+        while (processedItr.hasNext()) {
+            final Long ffId = processedItr.next();
             if (ffId != null && ffId.equals(flowFile.getId())) {
-                itr.remove();
+                processedItr.remove();
                 beingProcessed.remove(ffId);
                 removedCount++;
                 currentVersions.remove(ffId);
@@ -522,6 +534,9 @@ public class MockProcessSession implements ProcessSession {
         for (final List<MockFlowFile> list : transferMap.values()) {
             for (final MockFlowFile flowFile : list) {
                 processorQueue.offer(flowFile);
+                if (penalize) {
+                    penalized.add(flowFile);
+                }
             }
         }
 
@@ -529,6 +544,9 @@ public class MockProcessSession implements ProcessSession {
             final MockFlowFile flowFile = originalVersions.get(flowFileId);
             if (flowFile != null) {
                 processorQueue.offer(flowFile);
+                if (penalize) {
+                    penalized.add(flowFile);
+                }
             }
         }
 
@@ -538,6 +556,9 @@ public class MockProcessSession implements ProcessSession {
         originalVersions.clear();
         transferMap.clear();
         clearTransferState();
+        if (!penalize) {
+            penalized.clear();
+        }
     }
 
     @Override
@@ -696,6 +717,10 @@ public class MockProcessSession implements ProcessSession {
         return list;
     }
 
+    public List<MockFlowFile> getPenalizedFlowFiles() {
+        return penalized;
+    }
+
     /**
      * @param relationship to get flowfiles for
      * @return a List of FlowFiles in the order in which they were transferred
@@ -1013,6 +1038,7 @@ public class MockProcessSession implements ProcessSession {
         final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile);
         currentVersions.put(newFlowFile.getId(), newFlowFile);
         newFlowFile.setPenalized();
+        penalized.add(newFlowFile);
         return newFlowFile;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 7358b42..c7cded1 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -336,6 +336,11 @@ public class StandardProcessorTestRunner implements TestRunner {
     }
 
     @Override
+    public void assertPenalizeCount(final int count) {
+        Assert.assertEquals(count, getPenalizedFlowFiles().size());
+    }
+
+    @Override
     public void assertValid() {
         context.assertValid();
     }
@@ -453,6 +458,23 @@ public class StandardProcessorTestRunner implements TestRunner {
         return flowFiles;
     }
 
+    @Override
+    public List<MockFlowFile> getPenalizedFlowFiles() {
+        final List<MockFlowFile> flowFiles = new ArrayList<>();
+        for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+            flowFiles.addAll(session.getPenalizedFlowFiles());
+        }
+
+        Collections.sort(flowFiles, new Comparator<MockFlowFile>() {
+            @Override
+            public int compare(final MockFlowFile o1, final MockFlowFile o2) {
+                return Long.compare(o1.getCreationTime(), o2.getCreationTime());
+            }
+        });
+
+        return flowFiles;
+    }
+
     /**
      * @deprecated The ProvenanceReporter should not be accessed through the test runner, as it does not expose the events that were emitted.
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 5e45299..d1211ef 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -291,6 +291,14 @@ public interface TestRunner {
     void assertTransferCount(String relationship, int count);
 
     /**
+     * Assert that the number of FlowFiles that were penalized is equal to the given count
+     *
+     * @param count
+     *            number of expected penalized
+     */
+    void assertPenalizeCount(int count);
+
+    /**
      * Assert that there are no FlowFiles left on the input queue.
      */
     void assertQueueEmpty();
@@ -438,6 +446,13 @@ public interface TestRunner {
     List<MockFlowFile> getFlowFilesForRelationship(Relationship relationship);
 
     /**
+     * Returns a List of FlowFiles in the order in which they were transferred that were penalized
+     *
+     * @return flowfiles that were penalized
+     */
+    List<MockFlowFile> getPenalizedFlowFiles();
+
+    /**
      * @return the {@link ProvenanceReporter} that will be used by the
      *         configured {@link Processor} for reporting Provenance Events
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index 7576be3..4470bf6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -319,6 +319,14 @@ public final class InvokeHTTP extends AbstractProcessor {
             .allowableValues("true", "false")
             .build();
 
+    public static final PropertyDescriptor PROP_PENALIZE_NO_RETRY = new PropertyDescriptor.Builder()
+            .name("Penalize on \"No Retry\"")
+            .description("Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.")
+            .required(false)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
     public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
             PROP_METHOD,
             PROP_URL,
@@ -339,7 +347,8 @@ public final class InvokeHTTP extends AbstractProcessor {
             PROP_TRUSTED_HOSTNAME,
             PROP_ADD_HEADERS_TO_REQUEST,
             PROP_CONTENT_TYPE,
-            PROP_USE_CHUNKED_ENCODING));
+            PROP_USE_CHUNKED_ENCODING,
+            PROP_PENALIZE_NO_RETRY));
 
     // relationships
     public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
@@ -844,6 +853,9 @@ public final class InvokeHTTP extends AbstractProcessor {
             // 1xx, 3xx, 4xx -> NO RETRY
         } else {
             if (request != null) {
+                if (context.getProperty(PROP_PENALIZE_NO_RETRY).asBoolean()) {
+                    request = session.penalize(request);
+                }
                 session.transfer(request, REL_NO_RETRY);
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
index d889dee..4497a8b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
@@ -107,6 +107,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request status.code and status.message
         // original flow file (+attributes)
@@ -153,6 +154,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         //expected in request status.code and status.message
         //original flow file (+attributes)

http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
index a78fb97..d0f29b9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
@@ -114,6 +114,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request status.code and status.message
         // original flow file (+attributes)
@@ -166,6 +167,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request status.code and status.message
         // original flow file (+attributes)
@@ -204,6 +206,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request status.code and status.message
         // original flow file (+attributes)
@@ -244,6 +247,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY,0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request status.code and status.message
         // original flow file (+attributes)
@@ -285,6 +289,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request status.code and status.message
         // original flow file (+attributes)
@@ -325,6 +330,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request status.code and status.message
         // original flow file (+attributes)
@@ -362,6 +368,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request status.code and status.message
         // original flow file (+all attributes from response)
@@ -401,6 +408,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request status.code, status.message and body of response in attribute
         // original flow file (+attributes)
@@ -428,6 +436,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in response
         // status code, status message, all headers from server response --> ff attributes
@@ -455,6 +464,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         runner.setProperty(InvokeHTTP.PROP_METHOD,"OPTION");
 
@@ -465,6 +475,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
     }
 
     @Test
@@ -483,6 +494,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request
         // status code, status message, no ff content
@@ -516,6 +528,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request status.code and status.message
         // original flow file (+attributes)
@@ -572,6 +585,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in request status.code and status.message
         // original flow file (+attributes)
@@ -607,6 +621,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         //expected in request status.code and status.message
         //original flow file (+attributes)
@@ -646,6 +661,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         //expected in request status.code and status.message
         //original flow file (+attributes)
@@ -686,6 +702,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 1);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(1);
 
         // expected in response
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RETRY).get(0);
@@ -714,7 +731,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
-        // getMyFlowFiles();
+        runner.assertPenalizeCount(0);
         // expected in response
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
         final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@@ -741,7 +758,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
-        // getMyFlowFiles();
+        runner.assertPenalizeCount(0);
         // expected in response
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
         final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@@ -768,7 +785,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
-        // getMyFlowFiles();
+        runner.assertPenalizeCount(0);
         // expected in response
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
         final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@@ -783,6 +800,34 @@ public abstract class TestInvokeHttpCommon {
     }
 
     @Test
+    public void test400WithPenalizeNoRetry() throws Exception {
+        addHandler(new GetOrHeadHandler());
+
+        runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/400");
+        runner.setProperty(InvokeHTTP.PROP_PENALIZE_NO_RETRY, "true");
+
+        createFlowFiles(runner);
+
+        runner.run();
+        runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+        runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+        runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+        runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
+        runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(1);
+        // expected in response
+        final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
+        final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+
+        bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "400");
+        bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Bad Request");
+        bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/400");
+        final String expected = "Hello";
+        Assert.assertEquals(expected, actual);
+        bundle.assertAttributeEquals("Foo", "Bar");
+    }
+
+    @Test
     public void test412() throws Exception {
         addHandler(new GetOrHeadHandler());
 
@@ -797,6 +842,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         // expected in response
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
@@ -826,6 +872,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
         bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@@ -859,6 +906,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
         bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@@ -997,6 +1045,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
         bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@@ -1031,6 +1080,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
         bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@@ -1063,6 +1113,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
         bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@@ -1094,6 +1145,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         //expected in request status.code and status.message
         //original flow file (+attributes)
@@ -1130,6 +1182,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
+        runner.assertPenalizeCount(1);
 
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
 
@@ -1155,6 +1208,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
+        runner.assertPenalizeCount(1);
 
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
 
@@ -1179,6 +1233,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
+        runner.assertPenalizeCount(1);
 
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
 
@@ -1204,6 +1259,7 @@ public abstract class TestInvokeHttpCommon {
         runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
         runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+        runner.assertPenalizeCount(0);
 
         //expected in request status.code and status.message
         //original flow file (+attributes)