You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/11/19 07:43:12 UTC
[2/2] nifi git commit: NIFI-1086 Provide refactoring of InvokeHTTP
NIFI-980 Add support for HTTP Digest authentication to InvokeHttp NIFI-1080
Provide additional InvokeHttp unit tests NIFI-1133 InvokeHTTP Processor does
not save Location header for 3xx r
NIFI-1086 Provide refactoring of InvokeHTTP
NIFI-980 Add support for HTTP Digest authentication to InvokeHttp
NIFI-1080 Provide additional InvokeHttp unit tests
NIFI-1133 InvokeHTTP Processor does not save Location header for 3xx responses
NIFI-1009 InvokeHTTP should be able to be scheduled without any incoming connection for GET operations
NIFI-61 Multiple improvements for InvokeHTTP inclusive of providing unique tx.id across clusters, dynamic HTTP header properties
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8c2323dc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8c2323dc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8c2323dc
Branch: refs/heads/master
Commit: 8c2323dc8d0e107f1a99898370c7515fa9603122
Parents: fb335ea
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Mon Nov 2 15:45:20 2015 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu Nov 19 01:40:21 2015 -0500
----------------------------------------------------------------------
.../nifi-standard-processors/pom.xml | 52 +-
.../nifi/processors/standard/InvokeHTTP.java | 1119 ++++++++++--------
.../processors/standard/TestInvokeHTTP.java | 91 +-
.../processors/standard/TestInvokeHttpSSL.java | 3 +-
.../standard/util/TestInvokeHttpCommon.java | 1029 ++++++++++++----
pom.xml | 38 +
6 files changed, 1603 insertions(+), 729 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 76f9daf..0427927 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -126,11 +126,6 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-mock</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
<artifactId>nifi-socket-utils</artifactId>
</dependency>
<dependency>
@@ -138,11 +133,6 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-load-distribution-service-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-distributed-cache-client-service</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
@@ -154,21 +144,11 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-broker</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-ssl-context-service</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
<version>1.7</version>
@@ -189,7 +169,37 @@ language governing permissions and limitations under the License. -->
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>com.squareup.okhttp</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>2.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.burgstaller</groupId>
+ <artifactId>okhttp-digest</artifactId>
+ <version>0.4</version>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-distributed-cache-client-service</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index d827658..2a9760d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -18,13 +18,8 @@ package org.apache.nifi.processors.standard;
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.Proxy.Type;
@@ -41,30 +36,49 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
-import org.apache.commons.codec.binary.Base64;
+import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
+import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
+import com.burgstaller.okhttp.DispatchingAuthenticator;
+import com.burgstaller.okhttp.digest.CachingAuthenticator;
+import com.burgstaller.okhttp.digest.DigestAuthenticator;
+
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.Response;
+
+import com.squareup.okhttp.ResponseBody;
+import okio.BufferedSink;
+import org.apache.commons.io.input.TeeInputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -72,118 +86,61 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+import org.apache.nifi.stream.io.StreamUtils;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@SupportsBatching
@Tags({"http", "https", "rest", "client"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@TriggerWhenEmpty
@CapabilityDescription("An HTTP client processor which converts FlowFile attributes to HTTP headers, with configurable HTTP method, url, etc.")
@WritesAttributes({
@WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"),
@WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"),
- @WritesAttribute(attribute = "invokehttp.response.body", description = "The response body"),
+ @WritesAttribute(attribute = "invokehttp.response.body", description = "In the instance where the status code received is not a success (2xx) "
+ + "then the response body will be put to the 'invokehttp.response.body' attribute of the request FlowFile."),
@WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"),
@WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
- @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server")})
-@DynamicProperty(name = "Trusted Hostname", value = "A hostname", description = "Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
- + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
+ @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server"),
+ @WritesAttribute(attribute = "user-defined", description = "If the 'Put Response Body In Attribute' property is set then whatever it is set to "
+ + "will become the attribute key and the value would be the body of the HTTP response.")})
+@DynamicProperty(name = "Header Name", value = "Attribute Expression Language", supportsExpressionLanguage = true, description = "Send request header "
+ + "with a key matching the Dynamic Property Key and a value created by evaluating the Attribute Expression Language set in the value "
+ + "of the Dynamic Property.")
public final class InvokeHTTP extends AbstractProcessor {
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return Config.PROPERTIES;
- }
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
- if (Config.PROP_TRUSTED_HOSTNAME.getName().equalsIgnoreCase(propertyDescriptorName)) {
- return Config.PROP_TRUSTED_HOSTNAME;
- }
- return super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return Config.RELATIONSHIPS;
- }
-
- private volatile Pattern attributesToSend = null;
-
- @Override
- public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
- final String trimmedValue = StringUtils.trimToEmpty(newValue);
-
- // compile the attributes-to-send filter pattern
- if (Config.PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) {
- if (newValue.isEmpty()) {
- attributesToSend = null;
- } else {
- attributesToSend = Pattern.compile(trimmedValue);
- }
- }
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
- final List<ValidationResult> results = new ArrayList<>(1);
- final boolean proxyHostSet = validationContext.getProperty(Config.PROP_PROXY_HOST).isSet();
- final boolean proxyPortSet = validationContext.getProperty(Config.PROP_PROXY_PORT).isSet();
-
- if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
- results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
- }
-
- return results;
- }
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
- final FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final SSLContextService sslService = context.getProperty(Config.PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE);
-
- Transaction transaction = new Transaction(getLogger(), sslContext, attributesToSend, context, session, flowFile);
- transaction.process();
- }
-
- /**
- * Stores properties, relationships, configuration values, hard coded strings, magic numbers, etc.
- */
- public interface Config {
- // flowfile attribute keys returned after reading the response
- String STATUS_CODE = "invokehttp.status.code";
- String STATUS_MESSAGE = "invokehttp.status.message";
- String RESPONSE_BODY = "invokehttp.response.body";
- String REQUEST_URL = "invokehttp.request.url";
- String TRANSACTION_ID = "invokehttp.tx.id";
- String REMOTE_DN = "invokehttp.remote.dn";
-
- // Set of flowfile attributes which we generally always ignore during
- // processing, including when converting http headers, copying attributes, etc.
- // This set includes our strings defined above as well as some standard flowfile
- // attributes.
- public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ // flowfile attribute keys returned after reading the response
+ public final static String STATUS_CODE = "invokehttp.status.code";
+ public final static String STATUS_MESSAGE = "invokehttp.status.message";
+ public final static String RESPONSE_BODY = "invokehttp.response.body";
+ public final static String REQUEST_URL = "invokehttp.request.url";
+ public final static String TRANSACTION_ID = "invokehttp.tx.id";
+ public final static String REMOTE_DN = "invokehttp.remote.dn";
+
+ // Set of flowfile attributes which we generally always ignore during
+ // processing, including when converting http headers, copying attributes, etc.
+ // This set includes our strings defined above as well as some standard flowfile
+ // attributes.
+ public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN,
"uuid", "filename", "path")));
- // properties
- public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
+ // properties
+ public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
.name("HTTP Method")
- .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).")
+ .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS). Arbitrary methods are also supported but will "
+ + "be sent without a message body.")
.required(true)
.defaultValue("GET")
.expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.build();
- public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
.name("Remote URL")
.description("Remote URL which will be connected to, including scheme, host, port, path.")
.required(true)
@@ -191,7 +148,7 @@ public final class InvokeHTTP extends AbstractProcessor {
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
- public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
.name("Connection Timeout")
.description("Max wait time for connection to remote service.")
.required(true)
@@ -199,7 +156,7 @@ public final class InvokeHTTP extends AbstractProcessor {
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
- public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder()
.name("Read Timeout")
.description("Max wait time for response from remote service.")
.required(true)
@@ -207,7 +164,7 @@ public final class InvokeHTTP extends AbstractProcessor {
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
- public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder()
.name("Include Date Header")
.description("Include an RFC-2616 Date header in the request.")
.required(true)
@@ -216,7 +173,7 @@ public final class InvokeHTTP extends AbstractProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
- public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder()
.name("Follow Redirects")
.description("Follow HTTP redirects issued by remote server.")
.required(true)
@@ -225,48 +182,50 @@ public final class InvokeHTTP extends AbstractProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
- public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder()
.name("Attributes to Send")
.description("Regular expression that defines which attributes to send as HTTP headers in the request. "
- + "If not defined, no attributes are sent as headers.")
+ + "If not defined, no attributes are sent as headers. Also any dynamic properties set will be sent as headers. "
+ + "The dynamic property key will be the header key and the dynamic property value will be interpreted as expression "
+ + "language will be the header value.")
.required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
- public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
- public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder()
.name("Proxy Host")
.description("The fully qualified hostname or IP address of the proxy server")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- public static final PropertyDescriptor PROP_PROXY_PORT = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROP_PROXY_PORT = new PropertyDescriptor.Builder()
.name("Proxy Port")
.description("The port of the proxy server")
.required(false)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
- // Per RFC 7235, 2617, and 2616.
- // basic-credentials = base64-user-pass
- // base64-user-pass = userid ":" password
- // userid = *<TEXT excluding ":">
- // password = *TEXT
- //
- // OCTET = <any 8-bit sequence of data>
- // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
- // LWS = [CRLF] 1*( SP | HT )
- // TEXT = <any OCTET except CTLs but including LWS>
- //
- // Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs.
- public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
+ // Per RFC 7235, 2617, and 2616.
+ // basic-credentials = base64-user-pass
+ // base64-user-pass = userid ":" password
+ // userid = *<TEXT excluding ":">
+ // password = *TEXT
+ //
+ // OCTET = <any 8-bit sequence of data>
+ // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
+ // LWS = [CRLF] 1*( SP | HT )
+ // TEXT = <any OCTET except CTLs but including LWS>
+ //
+ // Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs.
+ public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
.name("Basic Authentication Username")
.displayName("Basic Authentication Username")
.description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).")
@@ -274,7 +233,7 @@ public final class InvokeHTTP extends AbstractProcessor {
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
.build();
- public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
.name("Basic Authentication Password")
.displayName("Basic Authentication Password")
.description("The password to be used by the client to authenticate against the Remote URL.")
@@ -283,7 +242,64 @@ public final class InvokeHTTP extends AbstractProcessor {
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
.build();
- public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ public static final PropertyDescriptor PROP_PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
+ .name("Put Response Body In Attribute")
+ .description("If set, the response body received back will be put into an attribute of the original FlowFile instead of a separate "
+ + "FlowFile. The attribute key to put to is determined by evaluating value of this property. ")
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor PROP_PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder()
+ .name("Max Length To Put In Attribute")
+ .description("If routing the response body to an attribute of the original (by setting the \"Put response body in attribute\" "
+ + "property or by receiving an error status code), the number of characters put to the attribute value will be at "
+ + "most this amount. This is important because attributes are held in memory and large attributes will quickly "
+ + "cause out of memory issues. If the output goes longer than this value, it will be truncated to fit. "
+ + "Consider making this smaller if able.")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("256")
+ .build();
+
+ public static final PropertyDescriptor PROP_DIGEST_AUTH = new PropertyDescriptor.Builder()
+ .name("Digest Authentication")
+ .displayName("Use Digest Authentication")
+ .description("Whether to communicate with the website using Digest Authentication. 'Basic Authentication Username' and 'Basic Authentication Password' are used "
+ + "for authentication.")
+ .required(false)
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor PROP_OUTPUT_RESPONSE_REGARDLESS = new PropertyDescriptor.Builder()
+ .name("Always Output Response")
+ .description("Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is "
+ + "or if the processor is configured to put the server response body in the request attribute. In the later configuration a request FlowFile with the "
+ + "response body in the attribute and a typical response FlowFile will be emitted to their respective relationships.")
+ .required(false)
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder()
+ .name("Trusted Hostname")
+ .description("Bypass the normal truststore hostname verifier to allow the specified remote hostname as trusted. "
+ + "Enabling this property has MITM security implications, use wisely. Will still accept other connections based "
+ + "on the normal truststore hostname verifier. Only valid with SSL (HTTPS) connections.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor PROP_ADD_HEADERS_TO_REQUEST = new PropertyDescriptor.Builder()
+ .name("Add Response Headers to Request")
+ .description("Enabling this property saves all the response headers to the original request. This may be when the response headers are needed "
+ + "but a response is not generated due to the status code received.")
+ .required(false)
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD,
PROP_URL,
PROP_SSL_CONTEXT_SERVICE,
@@ -295,499 +311,626 @@ public final class InvokeHTTP extends AbstractProcessor {
PROP_BASIC_AUTH_USERNAME,
PROP_BASIC_AUTH_PASSWORD,
PROP_PROXY_HOST,
- PROP_PROXY_PORT));
-
- // property to allow the hostname verifier to be overridden
- // this is a "hidden" property - it's configured using a dynamic user property
- public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder()
- .name("Trusted Hostname")
- .description("Bypass the normal truststore hostname verifier to allow the specified remote hostname as trusted. "
- + "Enabling this property has MITM security implications, use wisely. Will still accept other connections based "
- + "on the normal truststore hostname verifier. Only valid with SSL (HTTPS) connections.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .dynamic(true)
- .build();
-
- // relationships
- public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
+ PROP_PROXY_PORT,
+ PROP_PUT_OUTPUT_IN_ATTRIBUTE,
+ PROP_PUT_ATTRIBUTE_MAX_LENGTH,
+ PROP_DIGEST_AUTH,
+ PROP_OUTPUT_RESPONSE_REGARDLESS,
+ PROP_TRUSTED_HOSTNAME,
+ PROP_ADD_HEADERS_TO_REQUEST));
+
+ // relationships
+ public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
.name("Original")
- .description("Original FlowFile will be routed upon success (2xx status codes).")
+ .description("The original FlowFile will be routed upon success (2xx status codes). It will have new attributes detailing the "
+ + "success of the request.")
.build();
- public static final Relationship REL_SUCCESS_RESP = new Relationship.Builder()
+ public static final Relationship REL_RESPONSE = new Relationship.Builder()
.name("Response")
- .description("Response FlowFile will be routed upon success (2xx status codes).")
+ .description("A Response FlowFile will be routed upon success (2xx status codes). If the 'Output Response Regardless' property "
+ + "is true then the response will be sent to this relationship regardless of the status code received.")
.build();
- public static final Relationship REL_RETRY = new Relationship.Builder()
+ public static final Relationship REL_RETRY = new Relationship.Builder()
.name("Retry")
- .description("FlowFile will be routed on any status code that can be retried (5xx status codes).")
+ .description("The original FlowFile will be routed on any status code that can be retried (5xx status codes). It will have new "
+ + "attributes detailing the request.")
.build();
- public static final Relationship REL_NO_RETRY = new Relationship.Builder()
+ public static final Relationship REL_NO_RETRY = new Relationship.Builder()
.name("No Retry")
- .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).")
+ .description("The original FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes). "
+ + "It will have new attributes detailing the request.")
.build();
- public static final Relationship REL_FAILURE = new Relationship.Builder()
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("Failure")
- .description("FlowFile will be routed on any type of connection failure, timeout or general exception.")
+ .description("The original FlowFile will be routed on any type of connection failure, timeout or general exception. "
+ + "It will have new attributes detailing the request.")
.build();
- public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
- REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE)));
+ public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ REL_SUCCESS_REQ, REL_RESPONSE, REL_RETRY, REL_NO_RETRY, REL_FAILURE)));
- }
+ private volatile Set<String> dynamicPropertyNames = new HashSet<>();
/**
- * A single invocation of an HTTP request/response from the InvokeHTTP processor. This class encapsulates the entirety of the flowfile processing.
- * <p>
- * This class is not thread safe and is created new for every flowfile processed.
+ * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date
+ * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string).
*/
- private static class Transaction implements Config {
-
- /**
- * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date
- * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string).
- */
- private static final String rfc1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'";
- private static final DateTimeFormatter dateFormat = DateTimeFormat.forPattern(rfc1123).withLocale(Locale.US).withZoneUTC();
-
- /**
- * Every request/response cycle from this client has a unique transaction id which will be stored as a flowfile attribute. This generator is used to create the id.
- */
- private static final AtomicLong txIdGenerator = new AtomicLong();
-
- private static final Charset utf8 = Charset.forName("UTF-8");
-
- private final ProcessorLog logger;
- private final SSLContext sslContext;
- private final Pattern attributesToSend;
- private final ProcessContext context;
- private final ProcessSession session;
-
- private final long txId = txIdGenerator.incrementAndGet();
- private final long startNanos = System.nanoTime();
-
- private FlowFile request;
- private FlowFile response;
- private HttpURLConnection conn;
-
- private int statusCode;
- private String statusMessage;
-
- public Transaction(
- final ProcessorLog logger,
- final SSLContext sslContext,
- final Pattern attributesToSend,
- final ProcessContext context,
- final ProcessSession session,
- final FlowFile request) {
-
- this.logger = logger;
- this.sslContext = sslContext;
- this.attributesToSend = attributesToSend;
- this.context = context;
- this.session = session;
- this.request = request;
- }
+ private static final String RFC_1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'";
+ private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(RFC_1123).withLocale(Locale.US).withZoneUTC();
+ private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
- public void process() {
- try {
- openConnection();
- sendRequest();
- readResponse();
- transfer();
- } catch (final Exception e) {
- // log exception
- logger.error("Routing to {} due to exception: {}", new Object[] {REL_FAILURE.getName(), e}, e);
-
- // penalize
- request = session.penalize(request);
+ public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
- // transfer original to failure
- session.transfer(request, REL_FAILURE);
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
- // cleanup response flowfile, if applicable
- try {
- if (response != null) {
- session.remove(response);
- }
- } catch (final Exception e1) {
- logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] {e1}, e1);
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .required(false)
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+ .dynamic(true)
+ .expressionLanguageSupported(true)
+ .build();
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ private volatile Pattern regexAttributesToSend = null;
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ if (descriptor.isDynamic()) {
+ final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames);
+ if (newValue == null) {
+ newDynamicPropertyNames.remove(descriptor.getName());
+ } else if (oldValue == null) { // new property
+ newDynamicPropertyNames.add(descriptor.getName());
+ }
+ this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
+ } else {
+ // compile the attributes-to-send filter pattern
+ if (PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) {
+ if (newValue.isEmpty()) {
+ regexAttributesToSend = null;
+ } else {
+ final String trimmedValue = StringUtils.trimToEmpty(newValue);
+ regexAttributesToSend = Pattern.compile(trimmedValue);
}
}
}
+ }
- private void openConnection() throws IOException {
- // read the url property from the context
- final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(request).getValue());
- final URL url = new URL(urlstr);
- final String authuser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
- final String authpass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
-
- String authstrencoded = null;
- if (!authuser.isEmpty()) {
- String authstr = authuser + ":" + authpass;
- byte[] bytestrencoded = Base64.encodeBase64(authstr.getBytes(StandardCharsets.UTF_8));
- authstrencoded = new String(bytestrencoded, StandardCharsets.UTF_8);
- }
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>(1);
+ final boolean proxyHostSet = validationContext.getProperty(PROP_PROXY_HOST).isSet();
+ final boolean proxyPortSet = validationContext.getProperty(PROP_PROXY_PORT).isSet();
- // create the connection
- final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue();
- final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger();
- if (proxyHost == null || proxyPort == null) {
- conn = (HttpURLConnection) url.openConnection();
- } else {
- final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
- conn = (HttpURLConnection) url.openConnection(proxy);
- }
+ if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
+ results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
+ }
- if (authstrencoded != null) {
- conn.setRequestProperty("Authorization", "Basic " + authstrencoded);
- }
+ return results;
+ }
- // set the request method
- String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(request).getValue()).toUpperCase();
- conn.setRequestMethod(method);
+ @OnScheduled
+ public void setUpClient(final ProcessContext context) throws IOException {
+ okHttpClientAtomicReference.set(null);
- // set timeouts
- conn.setConnectTimeout(context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
- conn.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+ OkHttpClient okHttpClient = new OkHttpClient();
- // set whether to follow redirects
- conn.setInstanceFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean());
+ // Add a proxy if set
+ final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue();
+ final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger();
+ if (proxyHost != null && proxyPort != null) {
+ final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
+ okHttpClient.setProxy(proxy);
+ }
- // special handling for https
- if (conn instanceof HttpsURLConnection) {
- HttpsURLConnection sconn = (HttpsURLConnection) conn;
+ // Set timeouts
+ okHttpClient.setConnectTimeout((context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
+ okHttpClient.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
- // check if the ssl context is set
- if (sslContext != null) {
- sconn.setSSLSocketFactory(sslContext.getSocketFactory());
- }
+ // Set whether to follow redirects
+ okHttpClient.setFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean());
- // check the trusted hostname property and override the HostnameVerifier
- String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue());
- if (!trustedHostname.isEmpty()) {
- sconn.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, sconn.getHostnameVerifier()));
- }
- }
+ final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE);
+ // check if the ssl context is set and add the factory if so
+ if (sslContext != null) {
+ okHttpClient.setSslSocketFactory(sslContext.getSocketFactory());
}
- private void sendRequest() throws IOException {
- // set the http request properties using flowfile attribute values
- setRequestProperties();
+ // check the trusted hostname property and override the HostnameVerifier
+ String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue());
+ if (!trustedHostname.isEmpty()) {
+ okHttpClient.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, okHttpClient.getHostnameVerifier()));
+ }
- // log request
- logRequest();
+ final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
- // we only stream data for POST and PUT requests
- String method = conn.getRequestMethod().toUpperCase();
- if ("POST".equals(method) || "PUT".equals(method)) {
- conn.setDoOutput(true);
- conn.setFixedLengthStreamingMode(request.getSize());
+ // If the username/password properties are set then check if digest auth is being used
+ if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) {
+ final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
- // write the flowfile contents to the output stream
- try (OutputStream os = new BufferedOutputStream(conn.getOutputStream())) {
- session.exportTo(request, os);
- }
+ /*
+ * Currently OkHttp doesn't have built-in Digest Auth Support. The ticket for adding it is here:
+ * https://github.com/square/okhttp/issues/205#issuecomment-154047052
+ * Once added this should be refactored to use the built in support. For now, a third party lib is needed.
+ */
+ final Map<String, CachingAuthenticator> authCache = new ConcurrentHashMap<>();
+
+ com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass);
+
+ final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials);
+
+ DispatchingAuthenticator authenticator = new DispatchingAuthenticator.Builder()
+ .with("Digest", digestAuthenticator)
+ .build();
+
+ okHttpClient.interceptors().add(new AuthenticationCacheInterceptor(authCache));
+ okHttpClient.setAuthenticator(new CachingAuthenticatorDecorator(authenticator, authCache));
+ }
- // emit provenance event
- session.getProvenanceReporter().send(request, conn.getURL().toExternalForm());
+ okHttpClientAtomicReference.set(okHttpClient);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ OkHttpClient okHttpClient = okHttpClientAtomicReference.get();
+
+ FlowFile requestFlowFile = session.get();
+
+ // Checking to see if the property to put the body of the response in an attribute was set
+ boolean putToAttribute = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).isSet();
+ if (requestFlowFile == null) {
+ if(context.hasNonLoopConnection()){
+ return;
}
+ String request = context.getProperty(PROP_METHOD).evaluateAttributeExpressions().getValue().toUpperCase();
+ if ("POST".equals(request) || "PUT".equals(request)) {
+ return;
+ } else if (putToAttribute) {
+ requestFlowFile = session.create();
+ }
}
- private void readResponse() throws IOException {
+ // Setting some initial variables
+ final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
+ final ProcessorLog logger = getLogger();
+
+ // Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute.
+ final UUID txId = UUID.randomUUID();
+
+ FlowFile responseFlowFile = null;
+ try {
+ // read the url property from the context
+ final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(requestFlowFile).getValue());
+ final URL url = new URL(urlstr);
+
+ Request httpRequest = configureRequest(context, session, requestFlowFile, url);
+
+ // log request
+ logRequest(logger, httpRequest);
+
+ // emit send provenance event if successfully sent to the server
+ if (httpRequest.body() != null) {
+ session.getProvenanceReporter().send(requestFlowFile, url.toExternalForm(), true);
+ }
+
+ final long startNanos = System.nanoTime();
+ Response responseHttp = okHttpClient.newCall(httpRequest).execute();
// output the raw response headers (DEBUG level only)
- logResponse();
+ logResponse(logger, url, responseHttp);
// store the status code and message
- statusCode = conn.getResponseCode();
- statusMessage = conn.getResponseMessage();
+ int statusCode = responseHttp.code();
+ String statusMessage = responseHttp.message();
+
+ if (statusCode == 0) {
+ throw new IllegalStateException("Status code unknown, connection hasn't been attempted.");
+ }
+
+ // Create a map of the status attributes that are always written to the request and reponse FlowFiles
+ Map<String, String> statusAttributes = new HashMap<>();
+ statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
+ statusAttributes.put(STATUS_MESSAGE, statusMessage);
+ statusAttributes.put(REQUEST_URL, url.toExternalForm());
+ statusAttributes.put(TRANSACTION_ID, txId.toString());
- // always write the status attributes to the request flowfile
- request = writeStatusAttributes(request);
+ if (requestFlowFile != null) {
+ requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
+ }
- // read from the appropriate input stream
- try (InputStream is = getResponseStream()) {
+ // If the property to add the response headers to the request flowfile is true then add them
+ if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) {
+ // write the response headers as attributes
+ // this will overwrite any existing flowfile attributes
+ requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(url, responseHttp));
+ }
- // if not successful, store the response body into a flowfile attribute
- if (!isSuccess()) {
- String body = trimToEmpty(toString(is, utf8));
- request = session.putAttribute(request, RESPONSE_BODY, body);
+ boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
+ boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean();
+ ResponseBody responseBody = responseHttp.body();
+ boolean bodyExists = responseBody != null;
+
+ InputStream responseBodyStream = null;
+ SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null;
+ TeeInputStream teeInputStream = null;
+ try {
+ responseBodyStream = bodyExists ? responseBody.byteStream() : null;
+ if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) {
+ outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize);
+ teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute);
}
- // if successful, store the response body as the flowfile payload
- // we include additional flowfile attributes including the reponse headers
- // and the status codes.
- if (isSuccess()) {
+ if (outputBodyToResponseContent) {
+ /*
+ * If successful and putting to response flowfile, store the response body as the flowfile payload
+ * we include additional flowfile attributes including the response headers and the status codes.
+ */
+
// clone the flowfile to capture the response
- response = session.create(request);
+ if (requestFlowFile != null) {
+ responseFlowFile = session.create(requestFlowFile);
+ } else {
+ responseFlowFile = session.create();
+ }
// write the status attributes
- response = writeStatusAttributes(response);
+ responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes);
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
- response = session.putAllAttributes(response, convertAttributesFromHeaders());
+ responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(url, responseHttp));
// transfer the message body to the payload
// can potentially be null in edge cases
- if (is != null) {
- response = session.importFrom(is, response);
+ if (bodyExists) {
+ if (teeInputStream != null) {
+ responseFlowFile = session.importFrom(teeInputStream, responseFlowFile);
+ } else {
+ responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile);
+ }
// emit provenance event
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().fetch(response, conn.getURL().toExternalForm(), millis);
+ session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis);
+ }
+ }
+
+ // if not successful and request flowfile is not null, store the response body into a flowfile attribute
+ if (outputBodyToRequestAttribute && bodyExists) {
+ String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).evaluateAttributeExpressions(requestFlowFile).getValue();
+ if (attributeKey == null) {
+ attributeKey = RESPONSE_BODY;
}
+ byte[] outputBuffer;
+ int size;
+
+ if (outputStreamToRequestAttribute != null) {
+ outputBuffer = outputStreamToRequestAttribute.getBuffer();
+ size = outputStreamToRequestAttribute.size();
+ } else {
+ outputBuffer = new byte[maxAttributeSize];
+ size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false);
+ }
+ String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType()));
+ requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString);
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().modifyAttributes(requestFlowFile, "The " + attributeKey + " has been added. The value of which is the body of a http call to "
+ + url.toExternalForm() + ". It took " + millis + "millis,");
+ }
+ } finally {
+ if(outputStreamToRequestAttribute != null){
+ outputStreamToRequestAttribute.close();
+ outputStreamToRequestAttribute = null;
}
+ if(teeInputStream != null){
+ teeInputStream.close();
+ teeInputStream = null;
+ } else if(responseBodyStream != null){
+ responseBodyStream.close();
+ responseBodyStream = null;
+ }
+ }
+ route(requestFlowFile, responseFlowFile, session, context, statusCode);
+ } catch (final Exception e) {
+ // penalize or yield
+ if (requestFlowFile != null) {
+ logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e);
+ requestFlowFile = session.penalize(requestFlowFile);
+ // transfer original to failure
+ session.transfer(requestFlowFile, REL_FAILURE);
+ } else {
+ logger.error("Yielding processor due to exception encountered as a source processor: {}", e);
+ context.yield();
}
- }
- private void transfer() throws IOException {
- // check if we should penalize the request
- if (!isSuccess()) {
- request = session.penalize(request);
+ // cleanup response flowfile, if applicable
+ try {
+ if (responseFlowFile != null) {
+ session.remove(responseFlowFile);
+ }
+ } catch (final Exception e1) {
+ logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
}
+ }
+ }
- // log the status codes from the response
- logger.info("Request to {} returned status code {} for {}",
- new Object[] {conn.getURL().toExternalForm(), statusCode, request});
- // transfer to the correct relationship
- // 2xx -> SUCCESS
- if (isSuccess()) {
- // we have two flowfiles to transfer
- session.transfer(request, REL_SUCCESS_REQ);
- session.transfer(response, REL_SUCCESS_RESP);
+ private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, URL url) {
+ Request.Builder requestBuilder = new Request.Builder();
- // 5xx -> RETRY
- } else if (statusCode / 100 == 5) {
- session.transfer(request, REL_RETRY);
+ requestBuilder = requestBuilder.url(url);
+ final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
- // 1xx, 3xx, 4xx -> NO RETRY
- } else {
- session.transfer(request, REL_NO_RETRY);
- }
+ // If the username/password properties are set then check if digest auth is being used
+ if (!authUser.isEmpty() && "false".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) {
+ final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
+ String credential = com.squareup.okhttp.Credentials.basic(authUser, authPass);
+ requestBuilder = requestBuilder.header("Authorization", credential);
}
- private void setRequestProperties() {
+ // set the request method
+ String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase();
+ switch (method) {
+ case "GET":
+ requestBuilder = requestBuilder.get();
+ break;
+ case "POST":
+ RequestBody requestBody = getRequestBodyToSend(session, requestFlowFile);
+ requestBuilder = requestBuilder.post(requestBody);
+ break;
+ case "PUT":
+ requestBody = getRequestBodyToSend(session, requestFlowFile);
+ requestBuilder = requestBuilder.put(requestBody);
+ break;
+ case "HEAD":
+ requestBuilder = requestBuilder.head();
+ break;
+ case "DELETE":
+ requestBuilder = requestBuilder.delete();
+ break;
+ default:
+ requestBuilder = requestBuilder.method(method, null);
+ }
- // check if we should send the a Date header with the request
- if (context.getProperty(PROP_DATE_HEADER).asBoolean()) {
- conn.setRequestProperty("Date", getDateValue());
- }
+ requestBuilder = setHeaderProperties(context, requestBuilder, requestFlowFile);
- // iterate through the flowfile attributes, adding any attribute that
- // matches the attributes-to-send pattern. if the pattern is not set
- // (it's an optional property), ignore that attribute entirely
- if (attributesToSend != null) {
- Map<String, String> attributes = request.getAttributes();
- Matcher m = attributesToSend.matcher("");
- for (Map.Entry<String, String> entry : attributes.entrySet()) {
- String key = trimToEmpty(entry.getKey());
- String val = trimToEmpty(entry.getValue());
-
- // don't include any of the ignored attributes
- if (IGNORED_ATTRIBUTES.contains(key)) {
- continue;
- }
+ return requestBuilder.build();
+ }
- // check if our attribute key matches the pattern
- // if so, include in the request as a header
- m.reset(key);
- if (m.matches()) {
- conn.setRequestProperty(key, val);
- }
- }
+ private RequestBody getRequestBodyToSend(final ProcessSession session, final FlowFile requestFlowFile) {
+ return new RequestBody() {
+ @Override
+ public MediaType contentType() {
+ final String attributeValue = requestFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+ String contentType = attributeValue == null ? DEFAULT_CONTENT_TYPE : attributeValue;
+ return MediaType.parse(contentType);
}
+
+ @Override
+ public void writeTo(BufferedSink sink) throws IOException {
+ session.exportTo(requestFlowFile, sink.outputStream());
+ }
+ };
+ }
+
+ private Request.Builder setHeaderProperties(final ProcessContext context, Request.Builder requestBuilder, final FlowFile requestFlowFile) {
+ // check if we should send the a Date header with the request
+ if (context.getProperty(PROP_DATE_HEADER).asBoolean()) {
+ requestBuilder = requestBuilder.addHeader("Date", DATE_FORMAT.print(System.currentTimeMillis()));
+ }
+
+ for (String headerKey : dynamicPropertyNames) {
+ String headerValue = context.getProperty(headerKey).evaluateAttributeExpressions(requestFlowFile).getValue();
+ requestBuilder = requestBuilder.addHeader(headerKey, headerValue);
}
- /**
- * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
- */
- private Map<String, String> convertAttributesFromHeaders() throws IOException {
- // create a new hashmap to store the values from the connection
- Map<String, String> map = new HashMap<>();
- for (Map.Entry<String, List<String>> entry : conn.getHeaderFields().entrySet()) {
- String key = entry.getKey();
- if (key == null) {
+ // iterate through the flowfile attributes, adding any attribute that
+ // matches the attributes-to-send pattern. if the pattern is not set
+ // (it's an optional property), ignore that attribute entirely
+ if (regexAttributesToSend != null) {
+ Map<String, String> attributes = requestFlowFile.getAttributes();
+ Matcher m = regexAttributesToSend.matcher("");
+ for (Map.Entry<String, String> entry : attributes.entrySet()) {
+ String headerKey = trimToEmpty(entry.getKey());
+
+ // don't include any of the ignored attributes
+ if (IGNORED_ATTRIBUTES.contains(headerKey)) {
continue;
}
- List<String> values = entry.getValue();
-
- // we ignore any headers with no actual values (rare)
- if (values == null || values.isEmpty()) {
- continue;
+ // check if our attribute key matches the pattern
+ // if so, include in the request as a header
+ m.reset(headerKey);
+ if (m.matches()) {
+ String headerVal = trimToEmpty(entry.getValue());
+ requestBuilder = requestBuilder.addHeader(headerKey, headerVal);
}
+ }
+ }
+ return requestBuilder;
+ }
- // create a comma separated string from the values, this is stored in the map
- String value = csv(values);
- // put the csv into the map
- map.put(key, value);
+ private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode){
+ // check if we should penalize the request
+ if (!isSuccess(statusCode)) {
+ if (request == null) {
+ context.yield();
+ } else {
+ request = session.penalize(request);
}
+ }
- if (conn instanceof HttpsURLConnection) {
- HttpsURLConnection sconn = (HttpsURLConnection) conn;
- // this should seemingly not be required, but somehow the state of the jdk client is messed up
- // when retrieving SSL certificate related information if connect() has not been called previously.
- sconn.connect();
- map.put(REMOTE_DN, sconn.getPeerPrincipal().getName());
+ // If the property to output the response flowfile regardless of status code is set then transfer it
+ boolean responseSent = false;
+ if (context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean()) {
+ session.transfer(response, REL_RESPONSE);
+ responseSent = true;
+ }
+
+ // transfer to the correct relationship
+ // 2xx -> SUCCESS
+ if (isSuccess(statusCode)) {
+ // we have two flowfiles to transfer
+ if (request != null) {
+ session.transfer(request, REL_SUCCESS_REQ);
+ }
+ if (response != null && !responseSent) {
+ session.transfer(response, REL_RESPONSE);
}
- return map;
- }
+ // 5xx -> RETRY
+ } else if (statusCode / 100 == 5) {
+ if (request != null) {
+ session.transfer(request, REL_RETRY);
+ }
- private boolean isSuccess() throws IOException {
- if (statusCode == 0) {
- throw new IllegalStateException("Status code unknown, connection hasn't been attempted.");
+ // 1xx, 3xx, 4xx -> NO RETRY
+ } else {
+ if (request != null) {
+ session.transfer(request, REL_NO_RETRY);
}
- return statusCode / 100 == 2;
}
- private void logRequest() {
- logger.debug("\nRequest to remote service:\n\t{}\n{}",
- new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())});
- }
+ }
- private void logResponse() {
- logger.debug("\nResponse from remote service:\n\t{}\n{}",
- new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())});
- }
+ private boolean isSuccess(int statusCode) {
+ return statusCode / 100 == 2;
+ }
- private String getLogString(Map<String, List<String>> map) {
- StringBuilder sb = new StringBuilder();
- for (Map.Entry<String, List<String>> entry : map.entrySet()) {
- List<String> list = entry.getValue();
- if (list.isEmpty()) {
- continue;
- }
- sb.append("\t");
- sb.append(entry.getKey());
- sb.append(": ");
- if (list.size() == 1) {
- sb.append(list.get(0));
- } else {
- sb.append(list.toString());
- }
- sb.append("\n");
- }
- return sb.toString();
- }
+ private void logRequest(ProcessorLog logger, Request request) {
+ logger.debug("\nRequest to remote service:\n\t{}\n{}",
+ new Object[]{request.url().toExternalForm(), getLogString(request.headers().toMultimap())});
+ }
- /**
- * Convert a collection of string values into a overly simple comma separated string.
- *
- * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style.
- */
- private String csv(Collection<String> values) {
- if (values == null || values.isEmpty()) {
- return "";
- }
- if (values.size() == 1) {
- return values.iterator().next();
- }
+ private void logResponse(ProcessorLog logger, URL url, Response response) {
+ logger.debug("\nResponse from remote service:\n\t{}\n{}",
+ new Object[]{url.toExternalForm(), getLogString(response.headers().toMultimap())});
+ }
- StringBuilder sb = new StringBuilder();
- for (String value : values) {
- value = value.trim();
- if (value.isEmpty()) {
- continue;
- }
- if (sb.length() > 0) {
- sb.append(", ");
- }
- sb.append(value);
+ private String getLogString(Map<String, List<String>> map) {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, List<String>> entry : map.entrySet()) {
+ List<String> list = entry.getValue();
+ if (list.isEmpty()) {
+ continue;
+ }
+ sb.append("\t");
+ sb.append(entry.getKey());
+ sb.append(": ");
+ if (list.size() == 1) {
+ sb.append(list.get(0));
+ } else {
+ sb.append(list.toString());
}
- return sb.toString().trim();
+ sb.append("\n");
}
+ return sb.toString();
+ }
- /**
- * Return the current datetime as an RFC 1123 formatted string in the GMT tz.
- */
- private String getDateValue() {
- return dateFormat.print(System.currentTimeMillis());
+ /**
+ * Convert a collection of string values into a overly simple comma separated string.
+ * <p/>
+ * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style.
+ */
+ private String csv(Collection<String> values) {
+ if (values == null || values.isEmpty()) {
+ return "";
+ }
+ if (values.size() == 1) {
+ return values.iterator().next();
}
- /**
- * Returns a string from the input stream using the specified character encoding.
- */
- private String toString(InputStream is, Charset charset) throws IOException {
- if (is == null) {
- return "";
+ StringBuilder sb = new StringBuilder();
+ for (String value : values) {
+ value = value.trim();
+ if (value.isEmpty()) {
+ continue;
}
-
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] buf = new byte[4096];
- int len;
- while ((len = is.read(buf)) != -1) {
- out.write(buf, 0, len);
+ if (sb.length() > 0) {
+ sb.append(", ");
}
- return new String(out.toByteArray(), charset);
+ sb.append(value);
}
+ return sb.toString().trim();
+ }
- /**
- * Returns the input stream to use for reading from the remote server. We're either going to want the inputstream or errorstream, effectively depending on the status code.
- * <p>
- * This method can return null if there is no inputstream to read from. For example, if the remote server did not send a message body. eg. 204 No Content or 304 Not Modified
- */
- private InputStream getResponseStream() {
- try {
- InputStream is = conn.getErrorStream();
- if (is == null) {
- is = conn.getInputStream();
- }
- return new BufferedInputStream(is);
+ /**
+ * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
+ */
+ private Map<String, String> convertAttributesFromHeaders(URL url, Response responseHttp){
+ // create a new hashmap to store the values from the connection
+ Map<String, String> map = new HashMap<>();
+ for (Map.Entry<String, List<String>> entry : responseHttp.headers().toMultimap().entrySet()) {
+ String key = entry.getKey();
+ if (key == null) {
+ continue;
+ }
- } catch (IOException e) {
- logger.warn("Response stream threw an exception: {}", new Object[] {e}, e);
- return null;
+ List<String> values = entry.getValue();
+
+ // we ignore any headers with no actual values (rare)
+ if (values == null || values.isEmpty()) {
+ continue;
}
+
+ // create a comma separated string from the values, this is stored in the map
+ String value = csv(values);
+
+ // put the csv into the map
+ map.put(key, value);
}
- /**
- * Writes the status attributes onto the flowfile, returning the flowfile that was updated.
- */
- private FlowFile writeStatusAttributes(FlowFile flowfile) {
- flowfile = session.putAttribute(flowfile, STATUS_CODE, String.valueOf(statusCode));
- flowfile = session.putAttribute(flowfile, STATUS_MESSAGE, statusMessage);
- flowfile = session.putAttribute(flowfile, REQUEST_URL, conn.getURL().toExternalForm());
- flowfile = session.putAttribute(flowfile, TRANSACTION_ID, Long.toString(txId));
- return flowfile;
+ if ("HTTPS".equals(url.getProtocol().toUpperCase())) {
+ map.put(REMOTE_DN, responseHttp.handshake().peerPrincipal().getName());
}
- /**
- *
- */
- private static class OverrideHostnameVerifier implements HostnameVerifier {
+ return map;
+ }
- private final String trustedHostname;
- private final HostnameVerifier delegate;
+ private Charset getCharsetFromMediaType(MediaType contentType) {
+ return contentType != null ? contentType.charset(StandardCharsets.UTF_8) : StandardCharsets.UTF_8;
+ }
- private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) {
- this.trustedHostname = trustedHostname;
- this.delegate = delegate;
- }
+ private static class OverrideHostnameVerifier implements HostnameVerifier {
- @Override
- public boolean verify(String hostname, SSLSession session) {
- if (trustedHostname.equalsIgnoreCase(hostname)) {
- return true;
- }
- return delegate.verify(hostname, session);
+ private final String trustedHostname;
+ private final HostnameVerifier delegate;
+
+ private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) {
+ this.trustedHostname = trustedHostname;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean verify(String hostname, SSLSession session) {
+ if (trustedHostname.equalsIgnoreCase(hostname)) {
+ return true;
}
+ return delegate.verify(hostname, session);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
index a26b2ed..a82bc5a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
@@ -19,22 +19,27 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
+import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+
public class TestInvokeHTTP extends TestInvokeHttpCommon {
@BeforeClass
@@ -72,41 +77,99 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
return new TestServer();
}
+ @Test
+ public void testSslSetHttpRequest() throws Exception {
+
+ final Map<String, String> sslProperties = new HashMap<>();
+ sslProperties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
+ sslProperties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
+ sslProperties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
+ sslProperties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
+ sslProperties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
+ sslProperties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
+
+ runner = TestRunners.newTestRunner(InvokeHTTP.class);
+ final StandardSSLContextService sslService = new StandardSSLContextService();
+ runner.addControllerService("ssl-context", sslService, sslProperties);
+ runner.enableControllerService(sslService);
+ runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
+
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ // expected in request status.code and status.message
+ // original flow file (+attributes)
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
+ bundle.assertContentEquals("Hello".getBytes("UTF-8"));
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ // expected in response
+ // status code, status message, all headers from server response --> ff attributes
+ // server response message body into payload of ff
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+ bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+ bundle1.assertAttributeEquals("OkHttp-Selected-Protocol", "http/1.1");
+ }
+
// Currently InvokeHttp does not support Proxy via Https
@Test
public void testProxy() throws Exception {
addHandler(new MyProxyHandler());
URL proxyURL = new URL(url);
- runner.setProperty(InvokeHTTP.Config.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out
- runner.setProperty(InvokeHTTP.Config.PROP_PROXY_HOST, proxyURL.getHost());
- runner.setProperty(InvokeHTTP.Config.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort()));
+ runner.setProperty(InvokeHTTP.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out
+ runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, proxyURL.getHost());
+
+ try{
+ runner.run();
+ Assert.fail();
+ } catch (AssertionError e){
+ // Expect assetion error when proxy port isn't set but host is.
+ }
+ runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort()));
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
//expected in request status.code and status.message
//original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
//expected in response
//status code, status message, all headers from server response --> ff attributes
//server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("http://nifi.apache.org/".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
index d155b74..b3cd9dc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.standard;
-import org.apache.nifi.processors.standard.InvokeHTTP.Config;
import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.TestRunners;
@@ -63,7 +62,7 @@ public class TestInvokeHttpSSL extends TestInvokeHttpCommon {
final StandardSSLContextService sslService = new StandardSSLContextService();
runner.addControllerService("ssl-context", sslService, sslProperties);
runner.enableControllerService(sslService);
- runner.setProperty(Config.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
+ runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
server.clearHandlers();
}