You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/10/30 15:29:57 UTC
[31/50] [abbrv] nifi git commit: NIFI-10: Added FETCH Provenance
Event and updated processors to use this new event type
NIFI-10: Added FETCH Provenance Event and updated processors to use this new event type
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/51f56402
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/51f56402
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/51f56402
Branch: refs/heads/NIFI-655
Commit: 51f564024a2fbe7fbd08760635561f08619be0e4
Parents: aec32a2
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Oct 15 17:00:20 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Oct 26 14:58:50 2015 -0400
----------------------------------------------------------------------
.../nifi/provenance/ProvenanceEventType.java | 26 ++-
.../nifi/provenance/ProvenanceReporter.java | 37 ++++
.../nifi/util/MockProvenanceReporter.java | 35 ++-
.../nifi/processors/aws/s3/FetchS3Object.java | 2 +-
.../repository/StandardProvenanceReporter.java | 31 ++-
.../nifi/processors/standard/InvokeHTTP.java | 211 +++++++++----------
.../processors/standard/TestInvokeHTTP.java | 54 ++---
7 files changed, 258 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
index e5e47b7..188e8fc 100644
--- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
+++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
@@ -23,45 +23,66 @@ public enum ProvenanceEventType {
* not received from a remote system or external process
*/
CREATE,
+
/**
- * Indicates a provenance event for receiving data from an external process
+ * Indicates a provenance event for receiving data from an external process. This Event Type
+ * is expected to be the first event for a FlowFile. As such, a Processor that receives data
+ * from an external source and uses that data to replace the content of an existing FlowFile
+ * should use the {@link #FETCH} event type, rather than the RECEIVE event type.
*/
RECEIVE,
+
+ /**
+ * Indicates that the contents of a FlowFile were overwritten using the contents of some
+ * external resource. This is similar to the {@link #RECEIVE} event but varies in that
+ * RECEIVE events are intended to be used as the event that introduces the FlowFile into
+ * the system, whereas FETCH is used to indicate that the contents of an existing FlowFile
+ * were overwritten.
+ */
+ FETCH,
+
/**
* Indicates a provenance event for sending data to an external process
*/
SEND,
+
/**
* Indicates a provenance event for the conclusion of an object's life for
* some reason other than object expiration
*/
DROP,
+
/**
* Indicates a provenance event for the conclusion of an object's life due
* to the fact that the object could not be processed in a timely manner
*/
EXPIRE,
+
/**
* FORK is used to indicate that one or more FlowFile was derived from a
* parent FlowFile.
*/
FORK,
+
/**
* JOIN is used to indicate that a single FlowFile is derived from joining
* together multiple parent FlowFiles.
*/
JOIN,
+
/**
* CLONE is used to indicate that a FlowFile is an exact duplicate of its
* parent FlowFile.
*/
CLONE,
+
/**
* CONTENT_MODIFIED is used to indicate that a FlowFile's content was
* modified in some way. When using this Event Type, it is advisable to
* provide details about how the content is modified.
*/
CONTENT_MODIFIED,
+
/**
* ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's attributes were
* modified in some way. This event is not needed when another event is
@@ -69,17 +90,20 @@ public enum ProvenanceEventType {
* FlowFile attributes.
*/
ATTRIBUTES_MODIFIED,
+
/**
* ROUTE is used to show that a FlowFile was routed to a specified
* {@link org.apache.nifi.processor.Relationship Relationship} and should provide
* information about why the FlowFile was routed to this relationship.
*/
ROUTE,
+
/**
* Indicates a provenance event for adding additional information such as a
* new linkage to a new URI or UUID
*/
ADDINFO,
+
/**
* Indicates a provenance event for replaying a FlowFile. The UUID of the
* event will indicate the UUID of the original FlowFile that is being
http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
index db589f8..0fd29fd 100644
--- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
+++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
@@ -124,6 +124,43 @@ public interface ProvenanceReporter {
void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier, String details, long transmissionMillis);
/**
+ * Emits a Provenance Event of type
+ * {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given
+ * FlowFile was overwritten with the data received from an external source.
+ *
+ * @param flowFile the FlowFile whose content was replaced
+ * @param transitUri A URI that provides information about the System and
+ * Protocol information over which the transfer occurred.
+ */
+ void fetch(FlowFile flowFile, String transitUri);
+
+ /**
+ * Emits a Provenance Event of type
+ * {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given
+ * FlowFile was overwritten with the data received from an external source.
+ *
+ * @param flowFile the FlowFile whose content was replaced
+ * @param transitUri A URI that provides information about the System and
+ * Protocol information over which the transfer occurred.
+ * @param transmissionMillis the number of milliseconds taken to transfer the data
+ */
+ void fetch(FlowFile flowFile, String transitUri, long transmissionMillis);
+
+ /**
+ * Emits a Provenance Event of type
+ * {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given
+ * FlowFile was overwritten with the data received from an external source.
+ *
+ * @param flowFile the FlowFile whose content was replaced
+ * @param transitUri A URI that provides information about the System and
+ * Protocol information over which the transfer occurred.
+ * @param details details about the event
+ * @param transmissionMillis the number of milliseconds taken to transfer
+ * the data
+ */
+ void fetch(FlowFile flowFile, String transitUri, String details, long transmissionMillis);
+
+ /**
* Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND}
* that indicates that a copy of the given FlowFile was sent to an external
* destination. The external destination may be a remote system or may be a
http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
index 8c9a320..8458715 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
@@ -124,7 +124,40 @@ public class MockProvenanceReporter implements ProvenanceReporter {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE)
- .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
+ .setTransitUri(transitUri)
+ .setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier)
+ .setEventDuration(transmissionMillis)
+ .setDetails(details)
+ .build();
+ events.add(record);
+ } catch (final Exception e) {
+ logger.error("Failed to generate Provenance Event due to " + e);
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ }
+ }
+
+ @Override
+ public void fetch(final FlowFile flowFile, final String transitUri) {
+ fetch(flowFile, transitUri, -1L);
+ }
+
+ @Override
+ public void fetch(final FlowFile flowFile, final String transitUri, final long transmissionMillis) {
+ fetch(flowFile, transitUri, null, transmissionMillis);
+ }
+
+ @Override
+ public void fetch(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) {
+ verifyFlowFileKnown(flowFile);
+
+ try {
+ final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.FETCH)
+ .setTransitUri(transitUri)
+ .setEventDuration(transmissionMillis)
+ .setDetails(details)
+ .build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 131e671..bc6aeec 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -152,7 +152,7 @@ public class FetchS3Object extends AbstractS3Processor {
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
- session.getProvenanceReporter().receive(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
+ session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
index 8852f42..8a89dbf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
@@ -124,7 +124,36 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE)
- .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
+ .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
+ events.add(record);
+ } catch (final Exception e) {
+ logger.error("Failed to generate Provenance Event due to " + e);
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ }
+ }
+
+ @Override
+ public void fetch(final FlowFile flowFile, final String transitUri) {
+ fetch(flowFile, transitUri, -1L);
+ }
+
+ @Override
+ public void fetch(final FlowFile flowFile, final String transitUri, final long transmissionMillis) {
+ fetch(flowFile, transitUri, null, transmissionMillis);
+ }
+
+ @Override
+ public void fetch(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) {
+ verifyFlowFileKnown(flowFile);
+
+ try {
+ final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.FETCH)
+ .setTransitUri(transitUri)
+ .setEventDuration(transmissionMillis)
+ .setDetails(details)
+ .build();
events.add(record);
} catch (final Exception e) {
logger.error("Failed to generate Provenance Event due to " + e);
http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index c7be728..848652a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -89,7 +89,7 @@ import org.joda.time.format.DateTimeFormatter;
@WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
@WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server")})
@DynamicProperty(name = "Trusted Hostname", value = "A hostname", description = "Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
- + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
+ + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
public final class InvokeHTTP extends AbstractProcessor {
@Override
@@ -170,76 +170,75 @@ public final class InvokeHTTP extends AbstractProcessor {
// This set includes our strings defined above as well as some standard flowfile
// attributes.
public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
- STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN,
- "uuid", "filename", "path"
- )));
+ STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN,
+ "uuid", "filename", "path")));
// properties
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
- .name("HTTP Method")
- .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).")
- .required(true)
- .defaultValue("GET")
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("HTTP Method")
+ .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).")
+ .required(true)
+ .defaultValue("GET")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
- .name("Remote URL")
- .description("Remote URL which will be connected to, including scheme, host, port, path.")
- .required(true)
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.URL_VALIDATOR)
- .build();
+ .name("Remote URL")
+ .description("Remote URL which will be connected to, including scheme, host, port, path.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .build();
public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Connection Timeout")
- .description("Max wait time for connection to remote service.")
- .required(true)
- .defaultValue("5 secs")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .build();
+ .name("Connection Timeout")
+ .description("Max wait time for connection to remote service.")
+ .required(true)
+ .defaultValue("5 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Read Timeout")
- .description("Max wait time for response from remote service.")
- .required(true)
- .defaultValue("15 secs")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .build();
+ .name("Read Timeout")
+ .description("Max wait time for response from remote service.")
+ .required(true)
+ .defaultValue("15 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder()
- .name("Include Date Header")
- .description("Include an RFC-2616 Date header in the request.")
- .required(true)
- .defaultValue("True")
- .allowableValues("True", "False")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
+ .name("Include Date Header")
+ .description("Include an RFC-2616 Date header in the request.")
+ .required(true)
+ .defaultValue("True")
+ .allowableValues("True", "False")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder()
- .name("Follow Redirects")
- .description("Follow HTTP redirects issued by remote server.")
- .required(true)
- .defaultValue("True")
- .allowableValues("True", "False")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
+ .name("Follow Redirects")
+ .description("Follow HTTP redirects issued by remote server.")
+ .required(true)
+ .defaultValue("True")
+ .allowableValues("True", "False")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder()
- .name("Attributes to Send")
- .description("Regular expression that defines which attributes to send as HTTP headers in the request. "
- + "If not defined, no attributes are sent as headers.")
- .required(false)
- .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .build();
+ .name("Attributes to Send")
+ .description("Regular expression that defines which attributes to send as HTTP headers in the request. "
+ + "If not defined, no attributes are sent as headers.")
+ .required(false)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("SSL Context Service")
- .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
+ .name("SSL Context Service")
+ .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder()
.name("Proxy Host")
@@ -256,33 +255,33 @@ public final class InvokeHTTP extends AbstractProcessor {
.build();
// Per RFC 7235, 2617, and 2616.
- // basic-credentials = base64-user-pass
- // base64-user-pass = userid ":" password
- // userid = *<TEXT excluding ":">
- // password = *TEXT
+ // basic-credentials = base64-user-pass
+ // base64-user-pass = userid ":" password
+ // userid = *<TEXT excluding ":">
+ // password = *TEXT
//
- // OCTET = <any 8-bit sequence of data>
- // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
- // LWS = [CRLF] 1*( SP | HT )
- // TEXT = <any OCTET except CTLs but including LWS>
+ // OCTET = <any 8-bit sequence of data>
+ // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
+ // LWS = [CRLF] 1*( SP | HT )
+ // TEXT = <any OCTET except CTLs but including LWS>
//
// Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs.
public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
- .name("Basic Authentication Username")
- .displayName("Basic Authentication Username")
- .description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).")
- .required(false)
- .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
- .build();
+ .name("Basic Authentication Username")
+ .displayName("Basic Authentication Username")
+ .description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).")
+ .required(false)
+ .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
+ .build();
public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
- .name("Basic Authentication Password")
- .displayName("Basic Authentication Password")
- .description("The password to be used by the client to authenticate against the Remote URL.")
- .required(false)
- .sensitive(true)
- .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
- .build();
+ .name("Basic Authentication Password")
+ .displayName("Basic Authentication Password")
+ .description("The password to be used by the client to authenticate against the Remote URL.")
+ .required(false)
+ .sensitive(true)
+ .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
+ .build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD,
@@ -296,48 +295,46 @@ public final class InvokeHTTP extends AbstractProcessor {
PROP_BASIC_AUTH_USERNAME,
PROP_BASIC_AUTH_PASSWORD,
PROP_PROXY_HOST,
- PROP_PROXY_PORT
- ));
+ PROP_PROXY_PORT));
// property to allow the hostname verifier to be overridden
// this is a "hidden" property - it's configured using a dynamic user property
public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder()
- .name("Trusted Hostname")
- .description("Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
- + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .dynamic(true)
- .build();
+ .name("Trusted Hostname")
+ .description("Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
+ + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dynamic(true)
+ .build();
// relationships
public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
- .name("Original")
- .description("Original FlowFile will be routed upon success (2xx status codes).")
- .build();
+ .name("Original")
+ .description("Original FlowFile will be routed upon success (2xx status codes).")
+ .build();
public static final Relationship REL_SUCCESS_RESP = new Relationship.Builder()
- .name("Response")
- .description("Response FlowFile will be routed upon success (2xx status codes).")
- .build();
+ .name("Response")
+ .description("Response FlowFile will be routed upon success (2xx status codes).")
+ .build();
public static final Relationship REL_RETRY = new Relationship.Builder()
- .name("Retry")
- .description("FlowFile will be routed on any status code that can be retried (5xx status codes).")
- .build();
+ .name("Retry")
+ .description("FlowFile will be routed on any status code that can be retried (5xx status codes).")
+ .build();
public static final Relationship REL_NO_RETRY = new Relationship.Builder()
- .name("No Retry")
- .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).")
- .build();
+ .name("No Retry")
+ .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).")
+ .build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("Failure")
- .description("FlowFile will be routed on any type of connection failure, timeout or general exception.")
- .build();
+ .name("Failure")
+ .description("FlowFile will be routed on any type of connection failure, timeout or general exception.")
+ .build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
- REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE
- )));
+ REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE)));
}
@@ -403,7 +400,7 @@ public final class InvokeHTTP extends AbstractProcessor {
transfer();
} catch (final Exception e) {
// log exception
- logger.error("Routing to {} due to exception: {}", new Object[] { REL_FAILURE.getName(), e }, e);
+ logger.error("Routing to {} due to exception: {}", new Object[] {REL_FAILURE.getName(), e}, e);
// penalize
request = session.penalize(request);
@@ -417,7 +414,7 @@ public final class InvokeHTTP extends AbstractProcessor {
session.remove(response);
}
} catch (final Exception e1) {
- logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] { e1 }, e1);
+ logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] {e1}, e1);
}
}
}
@@ -545,7 +542,7 @@ public final class InvokeHTTP extends AbstractProcessor {
// emit provenance event
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().modifyContent(response, "Updated content with data received from " + conn.getURL().toExternalForm(), millis);
+ session.getProvenanceReporter().fetch(response, conn.getURL().toExternalForm(), millis);
}
}
@@ -562,7 +559,7 @@ public final class InvokeHTTP extends AbstractProcessor {
// log the status codes from the response
logger.info("Request to {} returned status code {} for {}",
- new Object[]{conn.getURL().toExternalForm(), statusCode, request});
+ new Object[] {conn.getURL().toExternalForm(), statusCode, request});
// transfer to the correct relationship
// 2xx -> SUCCESS
@@ -660,12 +657,12 @@ public final class InvokeHTTP extends AbstractProcessor {
private void logRequest() {
logger.debug("\nRequest to remote service:\n\t{}\n{}",
- new Object[]{conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())});
+ new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())});
}
private void logResponse() {
logger.debug("\nResponse from remote service:\n\t{}\n{}",
- new Object[]{conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())});
+ new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())});
}
private String getLogString(Map<String, List<String>> map) {
@@ -753,7 +750,7 @@ public final class InvokeHTTP extends AbstractProcessor {
return new BufferedInputStream(is);
} catch (IOException e) {
- logger.warn("Response stream threw an exception: {}", new Object[]{e}, e);
+ logger.warn("Response stream threw an exception: {}", new Object[] {e}, e);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
index 46cacca..a4fd3d7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
@@ -149,8 +149,8 @@ public class TestInvokeHTTP {
runner.assertTransferCount(Config.REL_NO_RETRY, 0);
runner.assertTransferCount(Config.REL_FAILURE, 0);
- //expected in request status.code and status.message
- //original flow file (+attributes)??????????
+ // expected in request status.code and status.message
+ // original flow file (+attributes)??????????
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0);
bundle.assertAttributeEquals(Config.STATUS_CODE, "200");
bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
@@ -159,10 +159,10 @@ public class TestInvokeHTTP {
Assert.assertEquals(expected, actual);
bundle.assertAttributeEquals("Foo", "Bar");
- //expected in response
- //status code, status message, all headers from server response --> ff attributes
- //server response message body into payload of ff
- //should not contain any original ff attributes
+ // expected in response
+ // status code, status message, all headers from server response --> ff attributes
+ // server response message body into payload of ff
+ // should not contain any original ff attributes
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0);
bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
bundle1.assertAttributeEquals(Config.STATUS_CODE, "200");
@@ -198,8 +198,8 @@ public class TestInvokeHTTP {
runner.assertTransferCount(Config.REL_NO_RETRY, 0);
runner.assertTransferCount(Config.REL_FAILURE, 0);
- //expected in request status.code and status.message
- //original flow file (+attributes)??????????
+ // expected in request status.code and status.message
+ // original flow file (+attributes)??????????
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0);
bundle.assertAttributeEquals(Config.STATUS_CODE, "200");
bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
@@ -208,10 +208,10 @@ public class TestInvokeHTTP {
final String expected = "Hello";
Assert.assertEquals(expected, actual);
- //expected in response
- //status code, status message, all headers from server response --> ff attributes
- //server response message body into payload of ff
- //should not contain any original ff attributes
+ // expected in response
+ // status code, status message, all headers from server response --> ff attributes
+ // server response message body into payload of ff
+ // should not contain any original ff attributes
final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0);
final String bundle1Content = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
assertTrue(bundle1Content.startsWith(expAuth)); // use startsWith instead of equals so we can ignore line endings
@@ -223,17 +223,17 @@ public class TestInvokeHTTP {
final List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
assertEquals(2, provEvents.size());
boolean forkEvent = false;
- boolean contentModEvent = false;
+ boolean fetchEvent = false;
for (final ProvenanceEventRecord event : provEvents) {
if (event.getEventType() == ProvenanceEventType.FORK) {
forkEvent = true;
- } else if (event.getEventType() == ProvenanceEventType.CONTENT_MODIFIED) {
- contentModEvent = true;
+ } else if (event.getEventType() == ProvenanceEventType.FETCH) {
+ fetchEvent = true;
}
}
assertTrue(forkEvent);
- assertTrue(contentModEvent);
+ assertTrue(fetchEvent);
}
@Test
@@ -257,8 +257,8 @@ public class TestInvokeHTTP {
runner.assertTransferCount(Config.REL_NO_RETRY, 1);
runner.assertTransferCount(Config.REL_FAILURE, 0);
- //expected in request status.code and status.message
- //original flow file (+attributes)??????????
+ // expected in request status.code and status.message
+ // original flow file (+attributes)??????????
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
bundle.assertAttributeEquals(Config.STATUS_CODE, "401");
bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Unauthorized");
@@ -286,7 +286,7 @@ public class TestInvokeHTTP {
runner.assertTransferCount(Config.REL_NO_RETRY, 0);
runner.assertTransferCount(Config.REL_FAILURE, 0);
- //expected in response
+ // expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
bundle.assertAttributeEquals(Config.STATUS_CODE, "500");
@@ -313,8 +313,8 @@ public class TestInvokeHTTP {
runner.assertTransferCount(Config.REL_RETRY, 0);
runner.assertTransferCount(Config.REL_NO_RETRY, 1);
runner.assertTransferCount(Config.REL_FAILURE, 0);
- //getMyFlowFiles();
- //expected in response
+ // getMyFlowFiles();
+ // expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@@ -340,8 +340,8 @@ public class TestInvokeHTTP {
runner.assertTransferCount(Config.REL_RETRY, 0);
runner.assertTransferCount(Config.REL_NO_RETRY, 1);
runner.assertTransferCount(Config.REL_FAILURE, 0);
- //getMyFlowFiles();
- //expected in response
+ // getMyFlowFiles();
+ // expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@@ -367,8 +367,8 @@ public class TestInvokeHTTP {
runner.assertTransferCount(Config.REL_RETRY, 0);
runner.assertTransferCount(Config.REL_NO_RETRY, 1);
runner.assertTransferCount(Config.REL_FAILURE, 0);
- //getMyFlowFiles();
- //expected in response
+ // getMyFlowFiles();
+ // expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@@ -397,7 +397,7 @@ public class TestInvokeHTTP {
runner.assertTransferCount(Config.REL_NO_RETRY, 1);
runner.assertTransferCount(Config.REL_FAILURE, 0);
- //expected in response
+ // expected in response
final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
@@ -593,7 +593,7 @@ public class TestInvokeHTTP {
@Override
public void handle(String target, Request baseRequest,
- HttpServletRequest request, HttpServletResponse response)
+ HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
baseRequest.setHandled(true);