You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/02/08 17:58:14 UTC
nifi git commit: NIFI-1423 Allow to penalize FlowFiles that are
routed to No Retry relationship
Repository: nifi
Updated Branches:
refs/heads/master 2673370cb -> eb6f9f0fe
NIFI-1423 Allow to penalize FlowFiles that are routed to No Retry relationship
This closes #183
Signed-off-by: jpercivall <jo...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/eb6f9f0f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/eb6f9f0f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/eb6f9f0f
Branch: refs/heads/master
Commit: eb6f9f0fec3042350864afee17a67ea62ef5f37a
Parents: 2673370
Author: ledor473 <le...@gmail.com>
Authored: Thu Jan 21 17:15:34 2016 -0500
Committer: jpercivall <jo...@yahoo.com>
Committed: Mon Feb 8 11:20:08 2016 -0500
----------------------------------------------------------------------
.../apache/nifi/util/MockProcessSession.java | 34 +++++++++--
.../nifi/util/StandardProcessorTestRunner.java | 22 +++++++
.../java/org/apache/nifi/util/TestRunner.java | 15 +++++
.../nifi/processors/standard/InvokeHTTP.java | 14 ++++-
.../processors/standard/TestInvokeHTTP.java | 2 +
.../standard/util/TestInvokeHttpCommon.java | 62 +++++++++++++++++++-
6 files changed, 141 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 67cf16e..ea45dbf 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -61,6 +61,7 @@ public class MockProcessSession implements ProcessSession {
private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>();
private final MockFlowFileQueue processorQueue;
private final Set<Long> beingProcessed = new HashSet<>();
+ private final List<MockFlowFile> penalized = new ArrayList<>();
private final Map<Long, MockFlowFile> currentVersions = new HashMap<>();
private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
@@ -429,11 +430,22 @@ public class MockProcessSession implements ProcessSession {
@Override
public void remove(final FlowFile flowFile) {
validateState(flowFile);
- final Iterator<Long> itr = beingProcessed.iterator();
- while (itr.hasNext()) {
- final Long ffId = itr.next();
+
+ final Iterator<MockFlowFile> penalizedItr = penalized.iterator();
+ while (penalizedItr.hasNext()) {
+ final MockFlowFile ff = penalizedItr.next();
+ if (Objects.equals(ff.getId(), flowFile.getId())) {
+ penalizedItr.remove();
+ penalized.remove(ff);
+ break;
+ }
+ }
+
+ final Iterator<Long> processedItr = beingProcessed.iterator();
+ while (processedItr.hasNext()) {
+ final Long ffId = processedItr.next();
if (ffId != null && ffId.equals(flowFile.getId())) {
- itr.remove();
+ processedItr.remove();
beingProcessed.remove(ffId);
removedCount++;
currentVersions.remove(ffId);
@@ -522,6 +534,9 @@ public class MockProcessSession implements ProcessSession {
for (final List<MockFlowFile> list : transferMap.values()) {
for (final MockFlowFile flowFile : list) {
processorQueue.offer(flowFile);
+ if (penalize) {
+ penalized.add(flowFile);
+ }
}
}
@@ -529,6 +544,9 @@ public class MockProcessSession implements ProcessSession {
final MockFlowFile flowFile = originalVersions.get(flowFileId);
if (flowFile != null) {
processorQueue.offer(flowFile);
+ if (penalize) {
+ penalized.add(flowFile);
+ }
}
}
@@ -538,6 +556,9 @@ public class MockProcessSession implements ProcessSession {
originalVersions.clear();
transferMap.clear();
clearTransferState();
+ if (!penalize) {
+ penalized.clear();
+ }
}
@Override
@@ -696,6 +717,10 @@ public class MockProcessSession implements ProcessSession {
return list;
}
+ public List<MockFlowFile> getPenalizedFlowFiles() {
+ return penalized;
+ }
+
/**
* @param relationship to get flowfiles for
* @return a List of FlowFiles in the order in which they were transferred
@@ -1013,6 +1038,7 @@ public class MockProcessSession implements ProcessSession {
final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile);
newFlowFile.setPenalized();
+ penalized.add(newFlowFile);
return newFlowFile;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 7358b42..c7cded1 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -336,6 +336,11 @@ public class StandardProcessorTestRunner implements TestRunner {
}
@Override
+ public void assertPenalizeCount(final int count) {
+ Assert.assertEquals(count, getPenalizedFlowFiles().size());
+ }
+
+ @Override
public void assertValid() {
context.assertValid();
}
@@ -453,6 +458,23 @@ public class StandardProcessorTestRunner implements TestRunner {
return flowFiles;
}
+ @Override
+ public List<MockFlowFile> getPenalizedFlowFiles() {
+ final List<MockFlowFile> flowFiles = new ArrayList<>();
+ for (final MockProcessSession session : sessionFactory.getCreatedSessions()) {
+ flowFiles.addAll(session.getPenalizedFlowFiles());
+ }
+
+ Collections.sort(flowFiles, new Comparator<MockFlowFile>() {
+ @Override
+ public int compare(final MockFlowFile o1, final MockFlowFile o2) {
+ return Long.compare(o1.getCreationTime(), o2.getCreationTime());
+ }
+ });
+
+ return flowFiles;
+ }
+
/**
* @deprecated The ProvenanceReporter should not be accessed through the test runner, as it does not expose the events that were emitted.
*/
http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 5e45299..d1211ef 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -291,6 +291,14 @@ public interface TestRunner {
void assertTransferCount(String relationship, int count);
/**
+ * Assert that the number of FlowFiles that were penalized is equal to the given count
+ *
+ * @param count
+ * number of expected penalized
+ */
+ void assertPenalizeCount(int count);
+
+ /**
* Assert that there are no FlowFiles left on the input queue.
*/
void assertQueueEmpty();
@@ -438,6 +446,13 @@ public interface TestRunner {
List<MockFlowFile> getFlowFilesForRelationship(Relationship relationship);
/**
+ * Returns a List of FlowFiles in the order in which they were transferred that were penalized
+ *
+ * @return flowfiles that were penalized
+ */
+ List<MockFlowFile> getPenalizedFlowFiles();
+
+ /**
* @return the {@link ProvenanceReporter} that will be used by the
* configured {@link Processor} for reporting Provenance Events
*/
http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index 7576be3..4470bf6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -319,6 +319,14 @@ public final class InvokeHTTP extends AbstractProcessor {
.allowableValues("true", "false")
.build();
+ public static final PropertyDescriptor PROP_PENALIZE_NO_RETRY = new PropertyDescriptor.Builder()
+ .name("Penalize on \"No Retry\"")
+ .description("Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.")
+ .required(false)
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .build();
+
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD,
PROP_URL,
@@ -339,7 +347,8 @@ public final class InvokeHTTP extends AbstractProcessor {
PROP_TRUSTED_HOSTNAME,
PROP_ADD_HEADERS_TO_REQUEST,
PROP_CONTENT_TYPE,
- PROP_USE_CHUNKED_ENCODING));
+ PROP_USE_CHUNKED_ENCODING,
+ PROP_PENALIZE_NO_RETRY));
// relationships
public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
@@ -844,6 +853,9 @@ public final class InvokeHTTP extends AbstractProcessor {
// 1xx, 3xx, 4xx -> NO RETRY
} else {
if (request != null) {
+ if (context.getProperty(PROP_PENALIZE_NO_RETRY).asBoolean()) {
+ request = session.penalize(request);
+ }
session.transfer(request, REL_NO_RETRY);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
index d889dee..4497a8b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
@@ -107,6 +107,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@@ -153,6 +154,7 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
//expected in request status.code and status.message
//original flow file (+attributes)
http://git-wip-us.apache.org/repos/asf/nifi/blob/eb6f9f0f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
index a78fb97..d0f29b9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
@@ -114,6 +114,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@@ -166,6 +167,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@@ -204,6 +206,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@@ -244,6 +247,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY,0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@@ -285,6 +289,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@@ -325,6 +330,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@@ -362,6 +368,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+all attributes from response)
@@ -401,6 +408,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request status.code, status.message and body of response in attribute
// original flow file (+attributes)
@@ -428,6 +436,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in response
// status code, status message, all headers from server response --> ff attributes
@@ -455,6 +464,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
runner.setProperty(InvokeHTTP.PROP_METHOD,"OPTION");
@@ -465,6 +475,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
}
@Test
@@ -483,6 +494,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request
// status code, status message, no ff content
@@ -516,6 +528,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@@ -572,6 +585,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in request status.code and status.message
// original flow file (+attributes)
@@ -607,6 +621,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
//expected in request status.code and status.message
//original flow file (+attributes)
@@ -646,6 +661,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
//expected in request status.code and status.message
//original flow file (+attributes)
@@ -686,6 +702,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(1);
// expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RETRY).get(0);
@@ -714,7 +731,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- // getMyFlowFiles();
+ runner.assertPenalizeCount(0);
// expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@@ -741,7 +758,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- // getMyFlowFiles();
+ runner.assertPenalizeCount(0);
// expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@@ -768,7 +785,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- // getMyFlowFiles();
+ runner.assertPenalizeCount(0);
// expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@@ -783,6 +800,34 @@ public abstract class TestInvokeHttpCommon {
}
@Test
+ public void test400WithPenalizeNoRetry() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/400");
+ runner.setProperty(InvokeHTTP.PROP_PENALIZE_NO_RETRY, "true");
+
+ createFlowFiles(runner);
+
+ runner.run();
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(1);
+ // expected in response
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "400");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Bad Request");
+ bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/400");
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+ }
+
+ @Test
public void test412() throws Exception {
addHandler(new GetOrHeadHandler());
@@ -797,6 +842,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
// expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
@@ -826,6 +872,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@@ -859,6 +906,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@@ -997,6 +1045,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@@ -1031,6 +1080,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@@ -1063,6 +1113,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
@@ -1094,6 +1145,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
//expected in request status.code and status.message
//original flow file (+attributes)
@@ -1130,6 +1182,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
+ runner.assertPenalizeCount(1);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
@@ -1155,6 +1208,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
+ runner.assertPenalizeCount(1);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
@@ -1179,6 +1233,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
+ runner.assertPenalizeCount(1);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
@@ -1204,6 +1259,7 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertPenalizeCount(0);
//expected in request status.code and status.message
//original flow file (+attributes)