You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/11/19 07:43:12 UTC

[2/2] nifi git commit: NIFI-1086 Provide refactoring of InvokeHTTP NIFI-980 Add support for HTTP Digest authentication to InvokeHttp NIFI-1080 Provide additional InvokeHttp unit tests NIFI-1133 InvokeHTTP Processor does not save Location header for 3xx r

NIFI-1086 Provide refactoring of InvokeHTTP
NIFI-980 Add support for HTTP Digest authentication to InvokeHttp
NIFI-1080 Provide additional InvokeHttp unit tests
NIFI-1133 InvokeHTTP Processor does not save Location header for 3xx responses
NIFI-1009 InvokeHTTP should be able to be scheduled without any incoming connection for GET operations
NIFI-61 Multiple improvements for InvokeHTTP inclusive of providing unique tx.id across clusters, dynamic HTTP header properties

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8c2323dc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8c2323dc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8c2323dc

Branch: refs/heads/master
Commit: 8c2323dc8d0e107f1a99898370c7515fa9603122
Parents: fb335ea
Author: Joseph Percivall <jo...@yahoo.com>
Authored: Mon Nov 2 15:45:20 2015 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu Nov 19 01:40:21 2015 -0500

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   52 +-
 .../nifi/processors/standard/InvokeHTTP.java    | 1119 ++++++++++--------
 .../processors/standard/TestInvokeHTTP.java     |   91 +-
 .../processors/standard/TestInvokeHttpSSL.java  |    3 +-
 .../standard/util/TestInvokeHttpCommon.java     | 1029 ++++++++++++----
 pom.xml                                         |   38 +
 6 files changed, 1603 insertions(+), 729 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 76f9daf..0427927 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -126,11 +126,6 @@ language governing permissions and limitations under the License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-mock</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-socket-utils</artifactId>
         </dependency>
         <dependency>
@@ -138,11 +133,6 @@ language governing permissions and limitations under the License. -->
             <artifactId>nifi-load-distribution-service-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-distributed-cache-client-service</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>
         </dependency>
@@ -154,21 +144,11 @@ language governing permissions and limitations under the License. -->
             <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-client</artifactId>
         </dependency>
-		<dependency>
-			<groupId>org.apache.activemq</groupId>
-			<artifactId>activemq-broker</artifactId>
-			<scope>test</scope>
-		</dependency>
         <dependency>
             <groupId>com.jayway.jsonpath</groupId>
             <artifactId>json-path</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-ssl-context-service</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.tika</groupId>
             <artifactId>tika-core</artifactId>
             <version>1.7</version>
@@ -189,7 +169,37 @@ language governing permissions and limitations under the License. -->
 			<groupId>org.codehaus.jackson</groupId>
 			<artifactId>jackson-mapper-asl</artifactId>
 		</dependency>
-
+        <dependency>
+            <groupId>com.squareup.okhttp</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>2.5.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.burgstaller</groupId>
+            <artifactId>okhttp-digest</artifactId>
+            <version>0.4</version>
+            <type>jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-broker</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.derby</groupId>
             <artifactId>derby</artifactId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index d827658..2a9760d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -18,13 +18,8 @@ package org.apache.nifi.processors.standard;
 
 import static org.apache.commons.lang3.StringUtils.trimToEmpty;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.Proxy;
 import java.net.Proxy.Type;
@@ -41,30 +36,49 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLSession;
 
-import org.apache.commons.codec.binary.Base64;
+import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
+import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
+import com.burgstaller.okhttp.DispatchingAuthenticator;
+import com.burgstaller.okhttp.digest.CachingAuthenticator;
+import com.burgstaller.okhttp.digest.DigestAuthenticator;
+
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.Response;
+
+import com.squareup.okhttp.ResponseBody;
+import okio.BufferedSink;
+import org.apache.commons.io.input.TeeInputStream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -72,118 +86,61 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
 @SupportsBatching
 @Tags({"http", "https", "rest", "client"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@TriggerWhenEmpty
 @CapabilityDescription("An HTTP client processor which converts FlowFile attributes to HTTP headers, with configurable HTTP method, url, etc.")
 @WritesAttributes({
     @WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"),
     @WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"),
-    @WritesAttribute(attribute = "invokehttp.response.body", description = "The response body"),
+    @WritesAttribute(attribute = "invokehttp.response.body", description = "In the instance where the status code received is not a success (2xx) "
+        + "then the response body will be put to the 'invokehttp.response.body' attribute of the request FlowFile."),
     @WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"),
     @WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
-    @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server")})
-@DynamicProperty(name = "Trusted Hostname", value = "A hostname", description = "Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
-    + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
+    @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server"),
+    @WritesAttribute(attribute = "user-defined", description = "If the 'Put Response Body In Attribute' property is set then whatever it is set to "
+        + "will become the attribute key and the value would be the body of the HTTP response.")})
+@DynamicProperty(name = "Header Name", value = "Attribute Expression Language", supportsExpressionLanguage = true, description = "Send request header "
+        + "with a key matching the Dynamic Property Key and a value created by evaluating the Attribute Expression Language set in the value "
+        + "of the Dynamic Property.")
 public final class InvokeHTTP extends AbstractProcessor {
 
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return Config.PROPERTIES;
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
-        if (Config.PROP_TRUSTED_HOSTNAME.getName().equalsIgnoreCase(propertyDescriptorName)) {
-            return Config.PROP_TRUSTED_HOSTNAME;
-        }
-        return super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return Config.RELATIONSHIPS;
-    }
-
-    private volatile Pattern attributesToSend = null;
-
-    @Override
-    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
-        final String trimmedValue = StringUtils.trimToEmpty(newValue);
-
-        // compile the attributes-to-send filter pattern
-        if (Config.PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) {
-            if (newValue.isEmpty()) {
-                attributesToSend = null;
-            } else {
-                attributesToSend = Pattern.compile(trimmedValue);
-            }
-        }
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>(1);
-        final boolean proxyHostSet = validationContext.getProperty(Config.PROP_PROXY_HOST).isSet();
-        final boolean proxyPortSet = validationContext.getProperty(Config.PROP_PROXY_PORT).isSet();
-
-        if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
-            results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
-        }
-
-        return results;
-    }
-
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        final FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final SSLContextService sslService = context.getProperty(Config.PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE);
-
-        Transaction transaction = new Transaction(getLogger(), sslContext, attributesToSend, context, session, flowFile);
-        transaction.process();
-    }
-
-    /**
-     * Stores properties, relationships, configuration values, hard coded strings, magic numbers, etc.
-     */
-    public interface Config {
-        // flowfile attribute keys returned after reading the response
-        String STATUS_CODE = "invokehttp.status.code";
-        String STATUS_MESSAGE = "invokehttp.status.message";
-        String RESPONSE_BODY = "invokehttp.response.body";
-        String REQUEST_URL = "invokehttp.request.url";
-        String TRANSACTION_ID = "invokehttp.tx.id";
-        String REMOTE_DN = "invokehttp.remote.dn";
-
-        // Set of flowfile attributes which we generally always ignore during
-        // processing, including when converting http headers, copying attributes, etc.
-        // This set includes our strings defined above as well as some standard flowfile
-        // attributes.
-        public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+    // flowfile attribute keys returned after reading the response
+    public final static String STATUS_CODE = "invokehttp.status.code";
+    public final static String STATUS_MESSAGE = "invokehttp.status.message";
+    public final static String RESPONSE_BODY = "invokehttp.response.body";
+    public final static String REQUEST_URL = "invokehttp.request.url";
+    public final static String TRANSACTION_ID = "invokehttp.tx.id";
+    public final static String REMOTE_DN = "invokehttp.remote.dn";
+
+    // Set of flowfile attributes which we generally always ignore during
+    // processing, including when converting http headers, copying attributes, etc.
+    // This set includes our strings defined above as well as some standard flowfile
+    // attributes.
+    public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
             STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN,
             "uuid", "filename", "path")));
 
-        // properties
-        public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
+    // properties
+    public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
             .name("HTTP Method")
-            .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).")
+            .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS). Arbitrary methods are also supported but will "
+                    + "be sent without a message body.")
             .required(true)
             .defaultValue("GET")
             .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
             .build();
 
-        public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
             .name("Remote URL")
             .description("Remote URL which will be connected to, including scheme, host, port, path.")
             .required(true)
@@ -191,7 +148,7 @@ public final class InvokeHTTP extends AbstractProcessor {
             .addValidator(StandardValidators.URL_VALIDATOR)
             .build();
 
-        public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
             .name("Connection Timeout")
             .description("Max wait time for connection to remote service.")
             .required(true)
@@ -199,7 +156,7 @@ public final class InvokeHTTP extends AbstractProcessor {
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
-        public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder()
             .name("Read Timeout")
             .description("Max wait time for response from remote service.")
             .required(true)
@@ -207,7 +164,7 @@ public final class InvokeHTTP extends AbstractProcessor {
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
-        public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder()
             .name("Include Date Header")
             .description("Include an RFC-2616 Date header in the request.")
             .required(true)
@@ -216,7 +173,7 @@ public final class InvokeHTTP extends AbstractProcessor {
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
-        public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder()
             .name("Follow Redirects")
             .description("Follow HTTP redirects issued by remote server.")
             .required(true)
@@ -225,48 +182,50 @@ public final class InvokeHTTP extends AbstractProcessor {
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
-        public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder()
             .name("Attributes to Send")
             .description("Regular expression that defines which attributes to send as HTTP headers in the request. "
-                    + "If not defined, no attributes are sent as headers.")
+                    + "If not defined, no attributes are sent as headers. Also any dynamic properties set will be sent as headers. "
+                    + "The dynamic property key will be the header key and the dynamic property value will be interpreted as expression "
+                    + "language will be the header value.")
             .required(false)
             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
             .build();
 
-        public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
             .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
             .required(false)
             .identifiesControllerService(SSLContextService.class)
             .build();
 
-        public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder()
             .name("Proxy Host")
             .description("The fully qualified hostname or IP address of the proxy server")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-        public static final PropertyDescriptor PROP_PROXY_PORT = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PROP_PROXY_PORT = new PropertyDescriptor.Builder()
             .name("Proxy Port")
             .description("The port of the proxy server")
             .required(false)
             .addValidator(StandardValidators.PORT_VALIDATOR)
             .build();
 
-        // Per RFC 7235, 2617, and 2616.
-        // basic-credentials = base64-user-pass
-        // base64-user-pass = userid ":" password
-        // userid = *<TEXT excluding ":">
-        // password = *TEXT
-        //
-        // OCTET = <any 8-bit sequence of data>
-        // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
-        // LWS = [CRLF] 1*( SP | HT )
-        // TEXT = <any OCTET except CTLs but including LWS>
-        //
-        // Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs.
-        public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
+    // Per RFC 7235, 2617, and 2616.
+    // basic-credentials = base64-user-pass
+    // base64-user-pass = userid ":" password
+    // userid = *<TEXT excluding ":">
+    // password = *TEXT
+    //
+    // OCTET = <any 8-bit sequence of data>
+    // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
+    // LWS = [CRLF] 1*( SP | HT )
+    // TEXT = <any OCTET except CTLs but including LWS>
+    //
+    // Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs.
+    public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
             .name("Basic Authentication Username")
             .displayName("Basic Authentication Username")
             .description("The username to be used by the client to authenticate against the Remote URL.  Cannot include control characters (0-31), ':', or DEL (127).")
@@ -274,7 +233,7 @@ public final class InvokeHTTP extends AbstractProcessor {
             .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
             .build();
 
-        public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
             .name("Basic Authentication Password")
             .displayName("Basic Authentication Password")
             .description("The password to be used by the client to authenticate against the Remote URL.")
@@ -283,7 +242,64 @@ public final class InvokeHTTP extends AbstractProcessor {
             .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
             .build();
 
-        public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+    public static final PropertyDescriptor PROP_PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("Put Response Body In Attribute")
+            .description("If set, the response body received back will be put into an attribute of the original FlowFile instead of a separate "
+                    + "FlowFile. The attribute key to put to is determined by evaluating value of this property. ")
+            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor PROP_PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder()
+            .name("Max Length To Put In Attribute")
+            .description("If routing the response body to an attribute of the original (by setting the \"Put response body in attribute\" "
+                    + "property or by receiving an error status code), the number of characters put to the attribute value will be at "
+                    + "most this amount. This is important because attributes are held in memory and large attributes will quickly "
+                    + "cause out of memory issues. If the output goes longer than this value, it will be truncated to fit. "
+                    + "Consider making this smaller if able.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("256")
+            .build();
+
+    public static final PropertyDescriptor PROP_DIGEST_AUTH = new PropertyDescriptor.Builder()
+            .name("Digest Authentication")
+            .displayName("Use Digest Authentication")
+            .description("Whether to communicate with the website using Digest Authentication. 'Basic Authentication Username' and 'Basic Authentication Password' are used "
+                    + "for authentication.")
+            .required(false)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
+    public static final PropertyDescriptor PROP_OUTPUT_RESPONSE_REGARDLESS = new PropertyDescriptor.Builder()
+            .name("Always Output Response")
+            .description("Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is "
+                    + "or if the processor is configured to put the server response body in the request attribute. In the later configuration a request FlowFile with the "
+                    + "response body in the attribute and a typical response FlowFile will be emitted to their respective relationships.")
+            .required(false)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
+    public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Trusted Hostname")
+            .description("Bypass the normal truststore hostname verifier to allow the specified remote hostname as trusted. "
+                    + "Enabling this property has MITM security implications, use wisely. Will still accept other connections based "
+                    + "on the normal truststore hostname verifier. Only valid with SSL (HTTPS) connections.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor PROP_ADD_HEADERS_TO_REQUEST = new PropertyDescriptor.Builder()
+            .name("Add Response Headers to Request")
+            .description("Enabling this property saves all the response headers to the original request. This may be when the response headers are needed "
+                    + "but a response is not generated due to the status code received.")
+            .required(false)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
+    public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
             PROP_METHOD,
             PROP_URL,
             PROP_SSL_CONTEXT_SERVICE,
@@ -295,499 +311,626 @@ public final class InvokeHTTP extends AbstractProcessor {
             PROP_BASIC_AUTH_USERNAME,
             PROP_BASIC_AUTH_PASSWORD,
             PROP_PROXY_HOST,
-            PROP_PROXY_PORT));
-
-        // property to allow the hostname verifier to be overridden
-        // this is a "hidden" property - it's configured using a dynamic user property
-        public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder()
-            .name("Trusted Hostname")
-            .description("Bypass the normal truststore hostname verifier to allow the specified remote hostname as trusted. "
-                    + "Enabling this property has MITM security implications, use wisely. Will still accept other connections based "
-                    + "on the normal truststore hostname verifier. Only valid with SSL (HTTPS) connections.")
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                .dynamic(true)
-            .build();
-
-        // relationships
-        public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
+            PROP_PROXY_PORT,
+            PROP_PUT_OUTPUT_IN_ATTRIBUTE,
+            PROP_PUT_ATTRIBUTE_MAX_LENGTH,
+            PROP_DIGEST_AUTH,
+            PROP_OUTPUT_RESPONSE_REGARDLESS,
+            PROP_TRUSTED_HOSTNAME,
+            PROP_ADD_HEADERS_TO_REQUEST));
+
+    // relationships
+    public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
             .name("Original")
-                .description("Original FlowFile will be routed upon success (2xx status codes).")
+            .description("The original FlowFile will be routed upon success (2xx status codes). It will have new attributes detailing the "
+                    + "success of the request.")
             .build();
 
-        public static final Relationship REL_SUCCESS_RESP = new Relationship.Builder()
+    public static final Relationship REL_RESPONSE = new Relationship.Builder()
             .name("Response")
-                .description("Response FlowFile will be routed upon success (2xx status codes).")
+            .description("A Response FlowFile will be routed upon success (2xx status codes). If the 'Output Response Regardless' property "
+                    + "is true then the response will be sent to this relationship regardless of the status code received.")
             .build();
 
-        public static final Relationship REL_RETRY = new Relationship.Builder()
+    public static final Relationship REL_RETRY = new Relationship.Builder()
             .name("Retry")
-                .description("FlowFile will be routed on any status code that can be retried (5xx status codes).")
+            .description("The original FlowFile will be routed on any status code that can be retried (5xx status codes). It will have new "
+                    + "attributes detailing the request.")
             .build();
 
-        public static final Relationship REL_NO_RETRY = new Relationship.Builder()
+    public static final Relationship REL_NO_RETRY = new Relationship.Builder()
             .name("No Retry")
-                .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).")
+            .description("The original FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).  "
+                    + "It will have new attributes detailing the request.")
             .build();
 
-        public static final Relationship REL_FAILURE = new Relationship.Builder()
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("Failure")
-                .description("FlowFile will be routed on any type of connection failure, timeout or general exception.")
+            .description("The original FlowFile will be routed on any type of connection failure, timeout or general exception. "
+                    + "It will have new attributes detailing the request.")
             .build();
 
-        public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
-            REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE)));
+    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS_REQ, REL_RESPONSE, REL_RETRY, REL_NO_RETRY, REL_FAILURE)));
 
-    }
+    private volatile Set<String> dynamicPropertyNames = new HashSet<>();
 
     /**
-     * A single invocation of an HTTP request/response from the InvokeHTTP processor. This class encapsulates the entirety of the flowfile processing.
-     * <p>
-     * This class is not thread safe and is created new for every flowfile processed.
+     * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date
+     * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string).
      */
-    private static class Transaction implements Config {
-
-        /**
-         * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date
-         * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string).
-         */
-        private static final String rfc1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'";
-        private static final DateTimeFormatter dateFormat = DateTimeFormat.forPattern(rfc1123).withLocale(Locale.US).withZoneUTC();
-
-        /**
-         * Every request/response cycle from this client has a unique transaction id which will be stored as a flowfile attribute. This generator is used to create the id.
-         */
-        private static final AtomicLong txIdGenerator = new AtomicLong();
-
-        private static final Charset utf8 = Charset.forName("UTF-8");
-
-        private final ProcessorLog logger;
-        private final SSLContext sslContext;
-        private final Pattern attributesToSend;
-        private final ProcessContext context;
-        private final ProcessSession session;
-
-        private final long txId = txIdGenerator.incrementAndGet();
-        private final long startNanos = System.nanoTime();
-
-        private FlowFile request;
-        private FlowFile response;
-        private HttpURLConnection conn;
-
-        private int statusCode;
-        private String statusMessage;
-
-        public Transaction(
-            final ProcessorLog logger,
-            final SSLContext sslContext,
-            final Pattern attributesToSend,
-            final ProcessContext context,
-            final ProcessSession session,
-            final FlowFile request) {
-
-            this.logger = logger;
-            this.sslContext = sslContext;
-            this.attributesToSend = attributesToSend;
-            this.context = context;
-            this.session = session;
-            this.request = request;
-        }
+    private static final String RFC_1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'";
+    private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(RFC_1123).withLocale(Locale.US).withZoneUTC();
 
+    private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
 
-        public void process() {
-            try {
-                openConnection();
-                sendRequest();
-                readResponse();
-                transfer();
-            } catch (final Exception e) {
-                // log exception
-                logger.error("Routing to {} due to exception: {}", new Object[] {REL_FAILURE.getName(), e}, e);
-
-                // penalize
-                request = session.penalize(request);
+    public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
 
-                // transfer original to failure
-                session.transfer(request, REL_FAILURE);
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
 
-                // cleanup response flowfile, if applicable
-                try {
-                    if (response != null) {
-                        session.remove(response);
-                    }
-                } catch (final Exception e1) {
-                    logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] {e1}, e1);
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .required(false)
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+                .dynamic(true)
+                .expressionLanguageSupported(true)
+                .build();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile Pattern regexAttributesToSend = null;
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        if (descriptor.isDynamic()) {
+            final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames);
+            if (newValue == null) {
+                newDynamicPropertyNames.remove(descriptor.getName());
+            } else if (oldValue == null) {    // new property
+                newDynamicPropertyNames.add(descriptor.getName());
+            }
+            this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
+        } else {
+            // compile the attributes-to-send filter pattern
+            if (PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) {
+                if (newValue.isEmpty()) {
+                    regexAttributesToSend = null;
+                } else {
+                    final String trimmedValue = StringUtils.trimToEmpty(newValue);
+                    regexAttributesToSend = Pattern.compile(trimmedValue);
                 }
             }
         }
+    }
 
-        private void openConnection() throws IOException {
-            // read the url property from the context
-            final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(request).getValue());
-            final URL url = new URL(urlstr);
-            final String authuser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
-            final String authpass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
-
-            String authstrencoded = null;
-            if (!authuser.isEmpty()) {
-                String authstr = authuser + ":" + authpass;
-                byte[] bytestrencoded = Base64.encodeBase64(authstr.getBytes(StandardCharsets.UTF_8));
-                authstrencoded = new String(bytestrencoded, StandardCharsets.UTF_8);
-            }
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(1);
+        final boolean proxyHostSet = validationContext.getProperty(PROP_PROXY_HOST).isSet();
+        final boolean proxyPortSet = validationContext.getProperty(PROP_PROXY_PORT).isSet();
 
-            // create the connection
-            final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue();
-            final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger();
-            if (proxyHost == null || proxyPort == null) {
-                conn = (HttpURLConnection) url.openConnection();
-            } else {
-                final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
-                conn = (HttpURLConnection) url.openConnection(proxy);
-            }
+        if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
+            results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
+        }
 
-            if (authstrencoded != null) {
-                conn.setRequestProperty("Authorization", "Basic " + authstrencoded);
-            }
+        return results;
+    }
 
-            // set the request method
-            String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(request).getValue()).toUpperCase();
-            conn.setRequestMethod(method);
+    @OnScheduled
+    public void setUpClient(final ProcessContext context) throws IOException {
+        okHttpClientAtomicReference.set(null);
 
-            // set timeouts
-            conn.setConnectTimeout(context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
-            conn.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+        OkHttpClient okHttpClient = new OkHttpClient();
 
-            // set whether to follow redirects
-            conn.setInstanceFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean());
+        // Add a proxy if set
+        final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue();
+        final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger();
+        if (proxyHost != null && proxyPort != null) {
+            final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
+            okHttpClient.setProxy(proxy);
+        }
 
-            // special handling for https
-            if (conn instanceof HttpsURLConnection) {
-                HttpsURLConnection sconn = (HttpsURLConnection) conn;
+        // Set timeouts
+        okHttpClient.setConnectTimeout((context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
+        okHttpClient.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
 
-                // check if the ssl context is set
-                if (sslContext != null) {
-                    sconn.setSSLSocketFactory(sslContext.getSocketFactory());
-                }
+        // Set whether to follow redirects
+        okHttpClient.setFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean());
 
-                // check the trusted hostname property and override the HostnameVerifier
-                String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue());
-                if (!trustedHostname.isEmpty()) {
-                    sconn.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, sconn.getHostnameVerifier()));
-                }
-            }
+        final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE);
 
+        // check if the ssl context is set and add the factory if so
+        if (sslContext != null) {
+            okHttpClient.setSslSocketFactory(sslContext.getSocketFactory());
         }
 
-        private void sendRequest() throws IOException {
-            // set the http request properties using flowfile attribute values
-            setRequestProperties();
+        // check the trusted hostname property and override the HostnameVerifier
+        String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue());
+        if (!trustedHostname.isEmpty()) {
+            okHttpClient.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, okHttpClient.getHostnameVerifier()));
+        }
 
-            // log request
-            logRequest();
+        final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
 
-            // we only stream data for POST and PUT requests
-            String method = conn.getRequestMethod().toUpperCase();
-            if ("POST".equals(method) || "PUT".equals(method)) {
-                conn.setDoOutput(true);
-                conn.setFixedLengthStreamingMode(request.getSize());
+        // If the username/password properties are set then check if digest auth is being used
+        if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) {
+            final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
 
-                // write the flowfile contents to the output stream
-                try (OutputStream os = new BufferedOutputStream(conn.getOutputStream())) {
-                    session.exportTo(request, os);
-                }
+            /*
+             * Currently OkHttp doesn't have built-in Digest Auth Support. The ticket for adding it is here:
+             * https://github.com/square/okhttp/issues/205#issuecomment-154047052
+             * Once added this should be refactored to use the built in support. For now, a third party lib is needed.
+             */
+            final Map<String, CachingAuthenticator> authCache = new ConcurrentHashMap<>();
+
+            com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass);
+
+            final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials);
+
+            DispatchingAuthenticator authenticator = new DispatchingAuthenticator.Builder()
+                    .with("Digest", digestAuthenticator)
+                    .build();
+
+            okHttpClient.interceptors().add(new AuthenticationCacheInterceptor(authCache));
+            okHttpClient.setAuthenticator(new CachingAuthenticatorDecorator(authenticator, authCache));
+        }
 
-                // emit provenance event
-                session.getProvenanceReporter().send(request, conn.getURL().toExternalForm());
+        okHttpClientAtomicReference.set(okHttpClient);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        OkHttpClient okHttpClient = okHttpClientAtomicReference.get();
+
+        FlowFile requestFlowFile = session.get();
+
+        // Checking to see if the property to put the body of the response in an attribute was set
+        boolean putToAttribute = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).isSet();
+        if (requestFlowFile == null) {
+            if(context.hasNonLoopConnection()){
+                return;
             }
 
+            String request = context.getProperty(PROP_METHOD).evaluateAttributeExpressions().getValue().toUpperCase();
+            if ("POST".equals(request) || "PUT".equals(request)) {
+                return;
+            } else if (putToAttribute) {
+                requestFlowFile = session.create();
+            }
         }
 
-        private void readResponse() throws IOException {
+        // Setting some initial variables
+        final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
+        final ProcessorLog logger = getLogger();
+
+        // Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute.
+        final UUID txId = UUID.randomUUID();
+
+        FlowFile responseFlowFile = null;
+        try {
+            // read the url property from the context
+            final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(requestFlowFile).getValue());
+            final URL url = new URL(urlstr);
+
+            Request httpRequest = configureRequest(context, session, requestFlowFile, url);
+
+            // log request
+            logRequest(logger, httpRequest);
+
+            // emit send provenance event if successfully sent to the server
+            if (httpRequest.body() != null) {
+                session.getProvenanceReporter().send(requestFlowFile, url.toExternalForm(), true);
+            }
+
+            final long startNanos = System.nanoTime();
+            Response responseHttp = okHttpClient.newCall(httpRequest).execute();
 
             // output the raw response headers (DEBUG level only)
-            logResponse();
+            logResponse(logger, url, responseHttp);
 
             // store the status code and message
-            statusCode = conn.getResponseCode();
-            statusMessage = conn.getResponseMessage();
+            int statusCode = responseHttp.code();
+            String statusMessage = responseHttp.message();
+
+            if (statusCode == 0) {
+                throw new IllegalStateException("Status code unknown, connection hasn't been attempted.");
+            }
+
+            // Create a map of the status attributes that are always written to the request and reponse FlowFiles
+            Map<String, String> statusAttributes = new HashMap<>();
+            statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
+            statusAttributes.put(STATUS_MESSAGE, statusMessage);
+            statusAttributes.put(REQUEST_URL, url.toExternalForm());
+            statusAttributes.put(TRANSACTION_ID, txId.toString());
 
-            // always write the status attributes to the request flowfile
-            request = writeStatusAttributes(request);
+            if (requestFlowFile != null) {
+                requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
+            }
 
-            // read from the appropriate input stream
-            try (InputStream is = getResponseStream()) {
+            // If the property to add the response headers to the request flowfile is true then add them
+            if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) {
+                // write the response headers as attributes
+                // this will overwrite any existing flowfile attributes
+                requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(url, responseHttp));
+            }
 
-                // if not successful, store the response body into a flowfile attribute
-                if (!isSuccess()) {
-                    String body = trimToEmpty(toString(is, utf8));
-                    request = session.putAttribute(request, RESPONSE_BODY, body);
+            boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
+            boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean();
+            ResponseBody responseBody = responseHttp.body();
+            boolean bodyExists = responseBody != null;
+
+            InputStream responseBodyStream = null;
+            SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null;
+            TeeInputStream teeInputStream = null;
+            try {
+                responseBodyStream = bodyExists ? responseBody.byteStream() : null;
+                if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) {
+                    outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize);
+                    teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute);
                 }
 
-                // if successful, store the response body as the flowfile payload
-                // we include additional flowfile attributes including the reponse headers
-                // and the status codes.
-                if (isSuccess()) {
+                if (outputBodyToResponseContent) {
+                    /*
+                     * If successful and putting to response flowfile, store the response body as the flowfile payload
+                     * we include additional flowfile attributes including the response headers and the status codes.
+                     */
+
                     // clone the flowfile to capture the response
-                    response = session.create(request);
+                    if (requestFlowFile != null) {
+                        responseFlowFile = session.create(requestFlowFile);
+                    } else {
+                        responseFlowFile = session.create();
+                    }
 
                     // write the status attributes
-                    response = writeStatusAttributes(response);
+                    responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes);
 
                     // write the response headers as attributes
                     // this will overwrite any existing flowfile attributes
-                    response = session.putAllAttributes(response, convertAttributesFromHeaders());
+                    responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(url, responseHttp));
 
                     // transfer the message body to the payload
                     // can potentially be null in edge cases
-                    if (is != null) {
-                        response = session.importFrom(is, response);
+                    if (bodyExists) {
+                        if (teeInputStream != null) {
+                            responseFlowFile = session.importFrom(teeInputStream, responseFlowFile);
+                        } else {
+                            responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile);
+                        }
 
                         // emit provenance event
                         final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                        session.getProvenanceReporter().fetch(response, conn.getURL().toExternalForm(), millis);
+                        session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis);
+                    }
+                }
+
+                // if not successful and request flowfile is not null, store the response body into a flowfile attribute
+                if (outputBodyToRequestAttribute && bodyExists) {
+                    String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).evaluateAttributeExpressions(requestFlowFile).getValue();
+                    if (attributeKey == null) {
+                        attributeKey = RESPONSE_BODY;
                     }
+                    byte[] outputBuffer;
+                    int size;
+
+                    if (outputStreamToRequestAttribute != null) {
+                        outputBuffer = outputStreamToRequestAttribute.getBuffer();
+                        size = outputStreamToRequestAttribute.size();
+                    } else {
+                        outputBuffer = new byte[maxAttributeSize];
+                        size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false);
+                    }
+                    String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType()));
+                    requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString);
 
+                    final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                    session.getProvenanceReporter().modifyAttributes(requestFlowFile, "The " + attributeKey + " has been added. The value of which is the body of a http call to "
+                            + url.toExternalForm() + ". It took " + millis + "millis,");
+                }
+            } finally {
+                if(outputStreamToRequestAttribute != null){
+                    outputStreamToRequestAttribute.close();
+                    outputStreamToRequestAttribute = null;
                 }
+                if(teeInputStream != null){
+                    teeInputStream.close();
+                    teeInputStream = null;
+                } else if(responseBodyStream != null){
+                    responseBodyStream.close();
+                    responseBodyStream = null;
+                }
+            }
 
+            route(requestFlowFile, responseFlowFile, session, context, statusCode);
+        } catch (final Exception e) {
+            // penalize or yield
+            if (requestFlowFile != null) {
+                logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e);
+                requestFlowFile = session.penalize(requestFlowFile);
+                // transfer original to failure
+                session.transfer(requestFlowFile, REL_FAILURE);
+            } else {
+                logger.error("Yielding processor due to exception encountered as a source processor: {}", e);
+                context.yield();
             }
 
-        }
 
-        private void transfer() throws IOException {
-            // check if we should penalize the request
-            if (!isSuccess()) {
-                request = session.penalize(request);
+            // cleanup response flowfile, if applicable
+            try {
+                if (responseFlowFile != null) {
+                    session.remove(responseFlowFile);
+                }
+            } catch (final Exception e1) {
+                logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
             }
+        }
+    }
 
-            // log the status codes from the response
-            logger.info("Request to {} returned status code {} for {}",
-                new Object[] {conn.getURL().toExternalForm(), statusCode, request});
 
-            // transfer to the correct relationship
-            // 2xx -> SUCCESS
-            if (isSuccess()) {
-                // we have two flowfiles to transfer
-                session.transfer(request, REL_SUCCESS_REQ);
-                session.transfer(response, REL_SUCCESS_RESP);
+    private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, URL url) {
+        Request.Builder requestBuilder = new Request.Builder();
 
-                // 5xx -> RETRY
-            } else if (statusCode / 100 == 5) {
-                session.transfer(request, REL_RETRY);
+        requestBuilder = requestBuilder.url(url);
+        final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
 
-                // 1xx, 3xx, 4xx -> NO RETRY
-            } else {
-                session.transfer(request, REL_NO_RETRY);
-            }
+        // If the username/password properties are set then check if digest auth is being used
+        if (!authUser.isEmpty() && "false".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) {
+            final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
 
+            String credential = com.squareup.okhttp.Credentials.basic(authUser, authPass);
+            requestBuilder = requestBuilder.header("Authorization", credential);
         }
 
-        private void setRequestProperties() {
+        // set the request method
+        String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase();
+        switch (method) {
+            case "GET":
+                requestBuilder = requestBuilder.get();
+                break;
+            case "POST":
+                RequestBody requestBody = getRequestBodyToSend(session, requestFlowFile);
+                requestBuilder = requestBuilder.post(requestBody);
+                break;
+            case "PUT":
+                requestBody = getRequestBodyToSend(session, requestFlowFile);
+                requestBuilder = requestBuilder.put(requestBody);
+                break;
+            case "HEAD":
+                requestBuilder = requestBuilder.head();
+                break;
+            case "DELETE":
+                requestBuilder = requestBuilder.delete();
+                break;
+            default:
+                requestBuilder = requestBuilder.method(method, null);
+        }
 
-            // check if we should send the a Date header with the request
-            if (context.getProperty(PROP_DATE_HEADER).asBoolean()) {
-                conn.setRequestProperty("Date", getDateValue());
-            }
+        requestBuilder = setHeaderProperties(context, requestBuilder, requestFlowFile);
 
-            // iterate through the flowfile attributes, adding any attribute that
-            // matches the attributes-to-send pattern. if the pattern is not set
-            // (it's an optional property), ignore that attribute entirely
-            if (attributesToSend != null) {
-                Map<String, String> attributes = request.getAttributes();
-                Matcher m = attributesToSend.matcher("");
-                for (Map.Entry<String, String> entry : attributes.entrySet()) {
-                    String key = trimToEmpty(entry.getKey());
-                    String val = trimToEmpty(entry.getValue());
-
-                    // don't include any of the ignored attributes
-                    if (IGNORED_ATTRIBUTES.contains(key)) {
-                        continue;
-                    }
+        return requestBuilder.build();
+    }
 
-                    // check if our attribute key matches the pattern
-                    // if so, include in the request as a header
-                    m.reset(key);
-                    if (m.matches()) {
-                        conn.setRequestProperty(key, val);
-                    }
-                }
+    private RequestBody getRequestBodyToSend(final ProcessSession session, final FlowFile requestFlowFile) {
+        return new RequestBody() {
+            @Override
+            public MediaType contentType() {
+                final String attributeValue = requestFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+                String contentType = attributeValue == null ? DEFAULT_CONTENT_TYPE : attributeValue;
+                return MediaType.parse(contentType);
             }
+
+            @Override
+            public void writeTo(BufferedSink sink) throws IOException {
+                session.exportTo(requestFlowFile, sink.outputStream());
+            }
+        };
+    }
+
+    private Request.Builder setHeaderProperties(final ProcessContext context, Request.Builder requestBuilder, final FlowFile requestFlowFile) {
+        // check if we should send the a Date header with the request
+        if (context.getProperty(PROP_DATE_HEADER).asBoolean()) {
+            requestBuilder = requestBuilder.addHeader("Date", DATE_FORMAT.print(System.currentTimeMillis()));
+        }
+
+        for (String headerKey : dynamicPropertyNames) {
+            String headerValue = context.getProperty(headerKey).evaluateAttributeExpressions(requestFlowFile).getValue();
+            requestBuilder = requestBuilder.addHeader(headerKey, headerValue);
         }
 
-        /**
-         * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
-         */
-        private Map<String, String> convertAttributesFromHeaders() throws IOException {
-            // create a new hashmap to store the values from the connection
-            Map<String, String> map = new HashMap<>();
-            for (Map.Entry<String, List<String>> entry : conn.getHeaderFields().entrySet()) {
-                String key = entry.getKey();
-                if (key == null) {
+        // iterate through the flowfile attributes, adding any attribute that
+        // matches the attributes-to-send pattern. if the pattern is not set
+        // (it's an optional property), ignore that attribute entirely
+        if (regexAttributesToSend != null) {
+            Map<String, String> attributes = requestFlowFile.getAttributes();
+            Matcher m = regexAttributesToSend.matcher("");
+            for (Map.Entry<String, String> entry : attributes.entrySet()) {
+                String headerKey = trimToEmpty(entry.getKey());
+
+                // don't include any of the ignored attributes
+                if (IGNORED_ATTRIBUTES.contains(headerKey)) {
                     continue;
                 }
 
-                List<String> values = entry.getValue();
-
-                // we ignore any headers with no actual values (rare)
-                if (values == null || values.isEmpty()) {
-                    continue;
+                // check if our attribute key matches the pattern
+                // if so, include in the request as a header
+                m.reset(headerKey);
+                if (m.matches()) {
+                    String headerVal = trimToEmpty(entry.getValue());
+                    requestBuilder = requestBuilder.addHeader(headerKey, headerVal);
                 }
+            }
+        }
+        return requestBuilder;
+    }
 
-                // create a comma separated string from the values, this is stored in the map
-                String value = csv(values);
 
-                // put the csv into the map
-                map.put(key, value);
+    private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode){
+        // check if we should penalize the request
+        if (!isSuccess(statusCode)) {
+            if (request == null) {
+                context.yield();
+            } else {
+                request = session.penalize(request);
             }
+        }
 
-            if (conn instanceof HttpsURLConnection) {
-                HttpsURLConnection sconn = (HttpsURLConnection) conn;
-                // this should seemingly not be required, but somehow the state of the jdk client is messed up
-                // when retrieving SSL certificate related information if connect() has not been called previously.
-                sconn.connect();
-                map.put(REMOTE_DN, sconn.getPeerPrincipal().getName());
+        // If the property to output the response flowfile regardless of status code is set then transfer it
+        boolean responseSent = false;
+        if (context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean()) {
+            session.transfer(response, REL_RESPONSE);
+            responseSent = true;
+        }
+
+        // transfer to the correct relationship
+        // 2xx -> SUCCESS
+        if (isSuccess(statusCode)) {
+            // we have two flowfiles to transfer
+            if (request != null) {
+                session.transfer(request, REL_SUCCESS_REQ);
+            }
+            if (response != null && !responseSent) {
+                session.transfer(response, REL_RESPONSE);
             }
 
-            return map;
-        }
+            // 5xx -> RETRY
+        } else if (statusCode / 100 == 5) {
+            if (request != null) {
+                session.transfer(request, REL_RETRY);
+            }
 
-        private boolean isSuccess() throws IOException {
-            if (statusCode == 0) {
-                throw new IllegalStateException("Status code unknown, connection hasn't been attempted.");
+            // 1xx, 3xx, 4xx -> NO RETRY
+        } else {
+            if (request != null) {
+                session.transfer(request, REL_NO_RETRY);
             }
-            return statusCode / 100 == 2;
         }
 
-        private void logRequest() {
-            logger.debug("\nRequest to remote service:\n\t{}\n{}",
-                new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())});
-        }
+    }
 
-        private void logResponse() {
-            logger.debug("\nResponse from remote service:\n\t{}\n{}",
-                new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())});
-        }
+    private boolean isSuccess(int statusCode) {
+        return statusCode / 100 == 2;
+    }
 
-        private String getLogString(Map<String, List<String>> map) {
-            StringBuilder sb = new StringBuilder();
-            for (Map.Entry<String, List<String>> entry : map.entrySet()) {
-                List<String> list = entry.getValue();
-                if (list.isEmpty()) {
-                    continue;
-                }
-                sb.append("\t");
-                sb.append(entry.getKey());
-                sb.append(": ");
-                if (list.size() == 1) {
-                    sb.append(list.get(0));
-                } else {
-                    sb.append(list.toString());
-                }
-                sb.append("\n");
-            }
-            return sb.toString();
-        }
+    private void logRequest(ProcessorLog logger, Request request) {
+        logger.debug("\nRequest to remote service:\n\t{}\n{}",
+                new Object[]{request.url().toExternalForm(), getLogString(request.headers().toMultimap())});
+    }
 
-        /**
-         * Convert a collection of string values into a overly simple comma separated string.
-         *
-         * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style.
-         */
-        private String csv(Collection<String> values) {
-            if (values == null || values.isEmpty()) {
-                return "";
-            }
-            if (values.size() == 1) {
-                return values.iterator().next();
-            }
+    private void logResponse(ProcessorLog logger, URL url, Response response) {
+        logger.debug("\nResponse from remote service:\n\t{}\n{}",
+                new Object[]{url.toExternalForm(), getLogString(response.headers().toMultimap())});
+    }
 
-            StringBuilder sb = new StringBuilder();
-            for (String value : values) {
-                value = value.trim();
-                if (value.isEmpty()) {
-                    continue;
-                }
-                if (sb.length() > 0) {
-                    sb.append(", ");
-                }
-                sb.append(value);
+    private String getLogString(Map<String, List<String>> map) {
+        StringBuilder sb = new StringBuilder();
+        for (Map.Entry<String, List<String>> entry : map.entrySet()) {
+            List<String> list = entry.getValue();
+            if (list.isEmpty()) {
+                continue;
+            }
+            sb.append("\t");
+            sb.append(entry.getKey());
+            sb.append(": ");
+            if (list.size() == 1) {
+                sb.append(list.get(0));
+            } else {
+                sb.append(list.toString());
             }
-            return sb.toString().trim();
+            sb.append("\n");
         }
+        return sb.toString();
+    }
 
-        /**
-         * Return the current datetime as an RFC 1123 formatted string in the GMT tz.
-         */
-        private String getDateValue() {
-            return dateFormat.print(System.currentTimeMillis());
+    /**
+     * Convert a collection of string values into a overly simple comma separated string.
+     * <p/>
+     * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style.
+     */
+    private String csv(Collection<String> values) {
+        if (values == null || values.isEmpty()) {
+            return "";
+        }
+        if (values.size() == 1) {
+            return values.iterator().next();
         }
 
-        /**
-         * Returns a string from the input stream using the specified character encoding.
-         */
-        private String toString(InputStream is, Charset charset) throws IOException {
-            if (is == null) {
-                return "";
+        StringBuilder sb = new StringBuilder();
+        for (String value : values) {
+            value = value.trim();
+            if (value.isEmpty()) {
+                continue;
             }
-
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            byte[] buf = new byte[4096];
-            int len;
-            while ((len = is.read(buf)) != -1) {
-                out.write(buf, 0, len);
+            if (sb.length() > 0) {
+                sb.append(", ");
             }
-            return new String(out.toByteArray(), charset);
+            sb.append(value);
         }
+        return sb.toString().trim();
+    }
 
-        /**
-         * Returns the input stream to use for reading from the remote server. We're either going to want the inputstream or errorstream, effectively depending on the status code.
-         * <p>
-         * This method can return null if there is no inputstream to read from. For example, if the remote server did not send a message body. eg. 204 No Content or 304 Not Modified
-         */
-        private InputStream getResponseStream() {
-            try {
-                InputStream is = conn.getErrorStream();
-                if (is == null) {
-                    is = conn.getInputStream();
-                }
-                return new BufferedInputStream(is);
+    /**
+     * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
+     */
+    private Map<String, String> convertAttributesFromHeaders(URL url, Response responseHttp){
+        // create a new hashmap to store the values from the connection
+        Map<String, String> map = new HashMap<>();
+        for (Map.Entry<String, List<String>> entry : responseHttp.headers().toMultimap().entrySet()) {
+            String key = entry.getKey();
+            if (key == null) {
+                continue;
+            }
 
-            } catch (IOException e) {
-                logger.warn("Response stream threw an exception: {}", new Object[] {e}, e);
-                return null;
+            List<String> values = entry.getValue();
+
+            // we ignore any headers with no actual values (rare)
+            if (values == null || values.isEmpty()) {
+                continue;
             }
+
+            // create a comma separated string from the values, this is stored in the map
+            String value = csv(values);
+
+            // put the csv into the map
+            map.put(key, value);
         }
 
-        /**
-         * Writes the status attributes onto the flowfile, returning the flowfile that was updated.
-         */
-        private FlowFile writeStatusAttributes(FlowFile flowfile) {
-            flowfile = session.putAttribute(flowfile, STATUS_CODE, String.valueOf(statusCode));
-            flowfile = session.putAttribute(flowfile, STATUS_MESSAGE, statusMessage);
-            flowfile = session.putAttribute(flowfile, REQUEST_URL, conn.getURL().toExternalForm());
-            flowfile = session.putAttribute(flowfile, TRANSACTION_ID, Long.toString(txId));
-            return flowfile;
+        if ("HTTPS".equals(url.getProtocol().toUpperCase())) {
+            map.put(REMOTE_DN, responseHttp.handshake().peerPrincipal().getName());
         }
 
-        /**
-         *
-         */
-        private static class OverrideHostnameVerifier implements HostnameVerifier {
+        return map;
+    }
 
-            private final String trustedHostname;
-            private final HostnameVerifier delegate;
+    private Charset getCharsetFromMediaType(MediaType contentType) {
+        return contentType != null ? contentType.charset(StandardCharsets.UTF_8) : StandardCharsets.UTF_8;
+    }
 
-            private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) {
-                this.trustedHostname = trustedHostname;
-                this.delegate = delegate;
-            }
+    private static class OverrideHostnameVerifier implements HostnameVerifier {
 
-            @Override
-            public boolean verify(String hostname, SSLSession session) {
-                if (trustedHostname.equalsIgnoreCase(hostname)) {
-                    return true;
-                }
-                return delegate.verify(hostname, session);
+        private final String trustedHostname;
+        private final HostnameVerifier delegate;
+
+        private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) {
+            this.trustedHostname = trustedHostname;
+            this.delegate = delegate;
+        }
+
+        @Override
+        public boolean verify(String hostname, SSLSession session) {
+            if (trustedHostname.equalsIgnoreCase(hostname)) {
+                return true;
             }
+            return delegate.verify(hostname, session);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
index a26b2ed..a82bc5a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
@@ -19,22 +19,27 @@ package org.apache.nifi.processors.standard;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
+import org.apache.nifi.ssl.StandardSSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunners;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+
 public class TestInvokeHTTP extends TestInvokeHttpCommon {
 
     @BeforeClass
@@ -72,41 +77,99 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
         return new TestServer();
     }
 
+    @Test
+    public void testSslSetHttpRequest() throws Exception {
+
+        final Map<String, String> sslProperties = new HashMap<>();
+        sslProperties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
+        sslProperties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
+        sslProperties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
+        sslProperties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
+        sslProperties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
+        sslProperties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
+
+        runner = TestRunners.newTestRunner(InvokeHTTP.class);
+        final StandardSSLContextService sslService = new StandardSSLContextService();
+        runner.addControllerService("ssl-context", sslService, sslProperties);
+        runner.enableControllerService(sslService);
+        runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
+
+        addHandler(new GetOrHeadHandler());
+
+        runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+
+        createFlowFiles(runner);
+
+        runner.run();
+
+        runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+        runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+        runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+        runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+        runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+        // expected in request status.code and status.message
+        // original flow file (+attributes)
+        final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
+        bundle.assertContentEquals("Hello".getBytes("UTF-8"));
+        bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+        bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+        bundle.assertAttributeEquals("Foo", "Bar");
+
+        // expected in response
+        // status code, status message, all headers from server response --> ff attributes
+        // server response message body into payload of ff
+        final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+        bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
+        bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+        bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+        bundle1.assertAttributeEquals("Foo", "Bar");
+        bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+        bundle1.assertAttributeEquals("OkHttp-Selected-Protocol", "http/1.1");
+    }
+
     // Currently InvokeHttp does not support Proxy via Https
     @Test
     public void testProxy() throws Exception {
         addHandler(new MyProxyHandler());
         URL proxyURL = new URL(url);
 
-        runner.setProperty(InvokeHTTP.Config.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out
-        runner.setProperty(InvokeHTTP.Config.PROP_PROXY_HOST, proxyURL.getHost());
-        runner.setProperty(InvokeHTTP.Config.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort()));
+        runner.setProperty(InvokeHTTP.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out
+        runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, proxyURL.getHost());
+
+        try{
+            runner.run();
+            Assert.fail();
+        } catch (AssertionError e){
+            // Expect assetion error when proxy port isn't set but host is.
+        }
+        runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort()));
 
         createFlowFiles(runner);
 
         runner.run();
 
-        runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
-        runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
-        runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
-        runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
-        runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+        runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+        runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+        runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+        runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+        runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
 
         //expected in request status.code and status.message
         //original flow file (+attributes)
-        final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
+        final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
         bundle.assertContentEquals("Hello".getBytes("UTF-8"));
-        bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
-        bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+        bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+        bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
         bundle.assertAttributeEquals("Foo", "Bar");
 
         //expected in response
         //status code, status message, all headers from server response --> ff attributes
         //server response message body into payload of ff
-        final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
+        final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
         bundle1.assertContentEquals("http://nifi.apache.org/".getBytes("UTF-8"));
-        bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
-        bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+        bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+        bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
         bundle1.assertAttributeEquals("Foo", "Bar");
         bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8c2323dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
index d155b74..b3cd9dc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.processors.standard;
 
-import org.apache.nifi.processors.standard.InvokeHTTP.Config;
 import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
 import org.apache.nifi.ssl.StandardSSLContextService;
 import org.apache.nifi.util.TestRunners;
@@ -63,7 +62,7 @@ public class TestInvokeHttpSSL extends TestInvokeHttpCommon {
         final StandardSSLContextService sslService = new StandardSSLContextService();
         runner.addControllerService("ssl-context", sslService, sslProperties);
         runner.enableControllerService(sslService);
-        runner.setProperty(Config.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
+        runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
 
         server.clearHandlers();
     }