You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/03/15 17:28:55 UTC
[nifi] branch main updated: NIFI-8304 Refactored InvokeHTTP unit
tests using OkHttp MockWebServer
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b1f513a NIFI-8304 Refactored InvokeHTTP unit tests using OkHttp MockWebServer
b1f513a is described below
commit b1f513a2caeb043abc8ff77d2230bad87fd8a2de
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Mar 11 17:39:40 2021 -0600
NIFI-8304 Refactored InvokeHTTP unit tests using OkHttp MockWebServer
- Replaced Joda Time with java.time for date formatting
- Replaced Guava Files with java.nio.file.Files for cache directory
- Updated PutTCP test server to close connection when testing connection per FlowFile
NIFI-8304 Removed Thread wrapper for TestListenHTTP client requests
NIFI-8304 Disabled InvokeHTTP Connection Pooling for testing
NIFI-8304 Set 60 second timeout for testing TLS connections
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4892.
---
.../nifi-standard-processors/pom.xml | 6 +
.../nifi/processors/standard/InvokeHTTP.java | 309 +--
.../nifi/processors/standard/InvokeHTTPTest.java | 859 ++++++++
.../nifi/processors/standard/TestInvokeHTTP.java | 448 ----
.../processors/standard/TestInvokeHttpSSL.java | 117 --
.../standard/TestInvokeHttpTwoWaySSL.java | 83 -
.../nifi/processors/standard/TestListenHTTP.java | 177 +-
.../processors/standard/util/TCPTestServer.java | 40 +-
.../standard/util/TestInvokeHttpCommon.java | 2187 --------------------
.../processors/standard/util/TestPutTCPCommon.java | 76 +-
10 files changed, 1090 insertions(+), 3212 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index d3d9290..2dab811 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -401,6 +401,12 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index 07dd549..e2954a7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -20,7 +20,6 @@ import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
import com.burgstaller.okhttp.digest.CachingAuthenticator;
import com.burgstaller.okhttp.digest.DigestAuthenticator;
-import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -29,7 +28,11 @@ import java.net.Proxy.Type;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.security.Principal;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -37,7 +40,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -47,15 +49,15 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
-import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.X509TrustManager;
import okhttp3.Cache;
import okhttp3.ConnectionPool;
import okhttp3.Credentials;
+import okhttp3.Handshake;
+import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.MultipartBody.Builder;
@@ -90,7 +92,6 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@@ -103,8 +104,6 @@ import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.StreamUtils;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
@@ -114,15 +113,15 @@ import static org.apache.commons.lang3.StringUtils.trimToEmpty;
@CapabilityDescription("An HTTP client processor which can interact with a configurable HTTP Endpoint. The destination URL and HTTP Method are configurable."
+ " FlowFile attributes are converted to HTTP headers and the FlowFile contents are included as the body of the request (if the HTTP Method is PUT, POST or PATCH).")
@WritesAttributes({
- @WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"),
- @WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"),
- @WritesAttribute(attribute = "invokehttp.response.body", description = "In the instance where the status code received is not a success (2xx) "
+ @WritesAttribute(attribute = InvokeHTTP.STATUS_CODE, description = "The status code that is returned"),
+ @WritesAttribute(attribute = InvokeHTTP.STATUS_MESSAGE, description = "The status message that is returned"),
+ @WritesAttribute(attribute = InvokeHTTP.RESPONSE_BODY, description = "In the instance where the status code received is not a success (2xx) "
+ "then the response body will be put to the 'invokehttp.response.body' attribute of the request FlowFile."),
- @WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"),
- @WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
- @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server"),
- @WritesAttribute(attribute = "invokehttp.java.exception.class", description = "The Java exception class raised when the processor fails"),
- @WritesAttribute(attribute = "invokehttp.java.exception.message", description = "The Java exception message raised when the processor fails"),
+ @WritesAttribute(attribute = InvokeHTTP.REQUEST_URL, description = "The request URL"),
+ @WritesAttribute(attribute = InvokeHTTP.TRANSACTION_ID, description = "The transaction ID that is returned after reading the response"),
+ @WritesAttribute(attribute = InvokeHTTP.REMOTE_DN, description = "The DN of the remote server"),
+ @WritesAttribute(attribute = InvokeHTTP.EXCEPTION_CLASS, description = "The Java exception class raised when the processor fails"),
+ @WritesAttribute(attribute = InvokeHTTP.EXCEPTION_MESSAGE, description = "The Java exception message raised when the processor fails"),
@WritesAttribute(attribute = "user-defined", description = "If the 'Put Response Body In Attribute' property is set then whatever it is set to "
+ "will become the attribute key and the value would be the body of the HTTP response.")})
@DynamicProperties({
@@ -137,7 +136,6 @@ import static org.apache.commons.lang3.StringUtils.trimToEmpty;
+ " If send message body is false, the flowfile will not be sent, but any other form data will be.")
})
public class InvokeHTTP extends AbstractProcessor {
- // flowfile attribute keys returned after reading the response
public final static String STATUS_CODE = "invokehttp.status.code";
public final static String STATUS_MESSAGE = "invokehttp.status.message";
public final static String RESPONSE_BODY = "invokehttp.response.body";
@@ -147,7 +145,6 @@ public class InvokeHTTP extends AbstractProcessor {
public final static String EXCEPTION_CLASS = "invokehttp.java.exception.class";
public final static String EXCEPTION_MESSAGE = "invokehttp.java.exception.message";
-
public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
public static final String FORM_BASE = "post:form";
@@ -161,13 +158,11 @@ public class InvokeHTTP extends AbstractProcessor {
EXCEPTION_CLASS, EXCEPTION_MESSAGE,
"uuid", "filename", "path")));
- // Set of HTTP header names explicitly excluded from requests.
- private static final Map<String, String> excludedHeaders = new HashMap<String, String>();
-
public static final String HTTP = "http";
public static final String HTTPS = "https";
private static final Pattern DYNAMIC_FORM_PARAMETER_NAME = Pattern.compile("post:form:(?<formDataName>.*)$");
+ private static final String FORM_DATA_NAME_GROUP = "formDataName";
// properties
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
@@ -332,9 +327,9 @@ public class InvokeHTTP extends AbstractProcessor {
public static final PropertyDescriptor PROP_FORM_BODY_FORM_NAME = new PropertyDescriptor.Builder()
.name("form-body-form-name")
- .displayName("Flowfile Form Data Name")
- .description("When Send Message Body is true, and Flowfile Form Data Name is set, "
- + " the Flowfile will be sent as the message body in multipart/form format with this value "
+ .displayName("FlowFile Form Data Name")
+ .description("When Send Message Body is true, and FlowFile Form Data Name is set, "
+ + " the FlowFile will be sent as the message body in multipart/form format with this value "
+ "as the form data name.")
.required(false)
.addValidator(
@@ -344,10 +339,10 @@ public class InvokeHTTP extends AbstractProcessor {
public static final PropertyDescriptor PROP_SET_FORM_FILE_NAME = new PropertyDescriptor.Builder()
.name("set-form-filename")
- .displayName("Set Flowfile Form Data File Name")
+ .displayName("Set FlowFile Form Data File Name")
.description(
- "When Send Message Body is true, Flowfile Form Data Name is set, "
- + "and Set Flowfile Form Data File Name is true, the Flowfile's fileName value "
+ "When Send Message Body is true, FlowFile Form Data Name is set, "
+ + "and Set FlowFile Form Data File Name is true, the FlowFile's fileName value "
+ "will be set as the filename property of the form data.")
.required(false)
.defaultValue("true")
@@ -558,24 +553,19 @@ public class InvokeHTTP extends AbstractProcessor {
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS_REQ, REL_RESPONSE, REL_RETRY, REL_NO_RETRY, REL_FAILURE)));
- private volatile Set<String> dynamicPropertyNames = new HashSet<>();
+ // RFC 2616 Date Time Formatter with hard-coded GMT Zone
+ private static final DateTimeFormatter RFC_2616_DATE_TIME = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'");
- /**
- * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date
- * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string).
- */
- private static final String RFC_1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'";
- private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(RFC_1123).withLocale(Locale.US).withZoneUTC();
+ // Multiple Header Delimiter
+ private static final String MULTIPLE_HEADER_DELIMITER = ", ";
- private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
+ private volatile Set<String> dynamicPropertyNames = new HashSet<>();
- @Override
- protected void init(ProcessorInitializationContext context) {
- excludedHeaders.put("Trusted Hostname", "HTTP request header '{}' excluded. " +
- "Update processor to use the SSLContextService instead. " +
- "See the Access Policies section in the System Administrator's Guide.");
+ private volatile Pattern regexAttributesToSend = null;
- }
+ private volatile boolean useChunked = false;
+
+ private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -591,7 +581,7 @@ public class InvokeHTTP extends AbstractProcessor {
return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
- .description("Form Data " + matcher.group("formDataName"))
+ .description("Form Data " + matcher.group(FORM_DATA_NAME_GROUP))
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -614,9 +604,6 @@ public class InvokeHTTP extends AbstractProcessor {
return RELATIONSHIPS;
}
- private volatile Pattern regexAttributesToSend = null;
- private volatile boolean useChunked = false;
-
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (descriptor.isDynamic()) {
@@ -674,20 +661,11 @@ public class InvokeHTTP extends AbstractProcessor {
ProxyConfiguration.validateProxySpec(validationContext, results, PROXY_SPECS);
- for (String headerKey : validationContext.getProperties().values()) {
- if (excludedHeaders.containsKey(headerKey)) {
- // We're not using the header message format string here, just this
- // static validation message string:
- results.add(new ValidationResult.Builder().subject(headerKey).valid(false).explanation("Matches excluded HTTP header name").build());
- }
- }
-
// Check for dynamic properties for form components.
// Even if the flowfile is not sent, we may still send form parameters.
boolean hasFormData = false;
- Map<String, PropertyDescriptor> propertyDescriptors = new HashMap<>();
- for (final Map.Entry<PropertyDescriptor, String> entry : validationContext.getProperties().entrySet()) {
- Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(entry.getKey().getName());
+ for (final PropertyDescriptor descriptor : validationContext.getProperties().keySet()) {
+ final Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(descriptor.getName());
if (matcher.matches()) {
hasFormData = true;
break;
@@ -699,15 +677,24 @@ public class InvokeHTTP extends AbstractProcessor {
final boolean contentNameSet = validationContext.getProperty(PROP_FORM_BODY_FORM_NAME).isSet();
if (hasFormData) {
if (sendBody && !contentNameSet) {
- results.add(new ValidationResult.Builder().subject(PROP_FORM_BODY_FORM_NAME.getDisplayName())
- .valid(false).explanation(
- "If dynamic form data properties are set, and send body is true, Flowfile Form Data Name must be configured.")
+ final String explanation = String.format("[%s] is required when Form Data properties are configured and [%s] is enabled",
+ PROP_FORM_BODY_FORM_NAME.getDisplayName(),
+ PROP_SEND_BODY.getDisplayName());
+ results.add(new ValidationResult.Builder()
+ .subject(PROP_FORM_BODY_FORM_NAME.getDisplayName())
+ .valid(false)
+ .explanation(explanation)
.build());
}
}
if (!sendBody && contentNameSet) {
- results.add(new ValidationResult.Builder().subject(PROP_FORM_BODY_FORM_NAME.getDisplayName())
- .valid(false).explanation("If Flowfile Form Data Name is configured, Send Message Body must be true.")
+ final String explanation = String.format("[%s] must be [true] when Form Data properties are configured and [%s] is configured",
+ PROP_SEND_BODY.getDisplayName(),
+ PROP_FORM_BODY_FORM_NAME.getDisplayName());
+ results.add(new ValidationResult.Builder()
+ .subject(PROP_FORM_BODY_FORM_NAME.getDisplayName())
+ .valid(false)
+ .explanation(explanation)
.build());
}
@@ -715,13 +702,11 @@ public class InvokeHTTP extends AbstractProcessor {
}
@OnScheduled
- public void setUpClient(final ProcessContext context) throws TlsException {
+ public void setUpClient(final ProcessContext context) throws TlsException, IOException {
okHttpClientAtomicReference.set(null);
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder();
- // Add a proxy if set
- boolean isHttpsProxy = HTTPS.equals(context.getProperty(PROP_PROXY_TYPE).evaluateAttributeExpressions().getValue());
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
final String proxyHost = context.getProperty(PROP_PROXY_HOST).evaluateAttributeExpressions().getValue();
@@ -754,11 +739,9 @@ public class InvokeHTTP extends AbstractProcessor {
okHttpClientBuilder.cache(new Cache(getETagCacheDir(), maxCacheSizeBytes));
}
- // Configure whether HTTP/2 protocol should be used or not
+ // Configure whether HTTP/2 protocol should be disabled
if (context.getProperty(DISABLE_HTTP2_PROTOCOL).asBoolean()) {
- okHttpClientBuilder.protocols(Arrays.asList(Protocol.HTTP_1_1));
- } else {
- okHttpClientBuilder.protocols(Arrays.asList(Protocol.HTTP_1_1, Protocol.HTTP_2));
+ okHttpClientBuilder.protocols(Collections.singletonList(Protocol.HTTP_1_1));
}
// Set timeouts
@@ -839,22 +822,14 @@ public class InvokeHTTP extends AbstractProcessor {
final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
final ComponentLog logger = getLogger();
- // log ETag cache metrics
- final boolean eTagEnabled = context.getProperty(PROP_USE_ETAG).asBoolean();
- if (eTagEnabled && logger.isDebugEnabled()) {
- final Cache cache = okHttpClient.cache();
- logger.debug("OkHttp ETag cache metrics :: Request Count: {} | Network Count: {} | Hit Count: {}",
- new Object[]{cache.requestCount(), cache.networkCount(), cache.hitCount()});
- }
-
// Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute.
final UUID txId = UUID.randomUUID();
FlowFile responseFlowFile = null;
try {
// read the url property from the context
- final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(requestFlowFile).getValue());
- final URL url = new URL(urlstr);
+ final String urlProperty = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(requestFlowFile).getValue());
+ final URL url = new URL(urlProperty);
Request httpRequest = configureRequest(context, session, requestFlowFile, url);
@@ -876,10 +851,6 @@ public class InvokeHTTP extends AbstractProcessor {
int statusCode = responseHttp.code();
String statusMessage = responseHttp.message();
- if (statusCode == 0) {
- throw new IllegalStateException("Status code unknown, connection hasn't been attempted.");
- }
-
// Create a map of the status attributes that are always written to the request and response FlowFiles
Map<String, String> statusAttributes = new HashMap<>();
statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
@@ -895,7 +866,7 @@ public class InvokeHTTP extends AbstractProcessor {
if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) {
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
- requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(url, responseHttp));
+ requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(responseHttp));
}
boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
@@ -931,14 +902,15 @@ public class InvokeHTTP extends AbstractProcessor {
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
- responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(url, responseHttp));
+ responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(responseHttp));
// transfer the message body to the payload
// can potentially be null in edge cases
if (bodyExists) {
// write content type attribute to response flowfile if it is available
- if (responseBody.contentType() != null) {
- responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), responseBody.contentType().toString());
+ final MediaType contentType = responseBody.contentType();
+ if (contentType != null) {
+ responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), contentType.toString());
}
if (teeInputStream != null) {
responseFlowFile = session.importFrom(teeInputStream, responseFlowFile);
@@ -982,14 +954,11 @@ public class InvokeHTTP extends AbstractProcessor {
} finally {
if (outputStreamToRequestAttribute != null) {
outputStreamToRequestAttribute.close();
- outputStreamToRequestAttribute = null;
}
if (teeInputStream != null) {
teeInputStream.close();
- teeInputStream = null;
} else if (responseBodyStream != null) {
responseBodyStream.close();
- responseBodyStream = null;
}
}
@@ -1012,21 +981,17 @@ public class InvokeHTTP extends AbstractProcessor {
// cleanup response flowfile, if applicable
- try {
- if (responseFlowFile != null) {
- session.remove(responseFlowFile);
- }
- } catch (final Exception e1) {
- logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
+ if (responseFlowFile != null) {
+ session.remove(responseFlowFile);
}
}
}
private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, URL url) {
- Request.Builder requestBuilder = new Request.Builder();
+ final Request.Builder requestBuilder = new Request.Builder();
- requestBuilder = requestBuilder.url(url);
+ requestBuilder.url(url);
final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
// If the username/password properties are set then check if digest auth is being used
@@ -1034,41 +999,41 @@ public class InvokeHTTP extends AbstractProcessor {
final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
String credential = Credentials.basic(authUser, authPass);
- requestBuilder = requestBuilder.header("Authorization", credential);
+ requestBuilder.header("Authorization", credential);
}
// set the request method
String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase();
switch (method) {
case "GET":
- requestBuilder = requestBuilder.get();
+ requestBuilder.get();
break;
case "POST":
RequestBody requestBody = getRequestBodyToSend(session, context, requestFlowFile);
- requestBuilder = requestBuilder.post(requestBody);
+ requestBuilder.post(requestBody);
break;
case "PUT":
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
- requestBuilder = requestBuilder.put(requestBody);
+ requestBuilder.put(requestBody);
break;
case "PATCH":
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
- requestBuilder = requestBuilder.patch(requestBody);
+ requestBuilder.patch(requestBody);
break;
case "HEAD":
- requestBuilder = requestBuilder.head();
+ requestBuilder.head();
break;
case "DELETE":
- requestBuilder = requestBuilder.delete();
+ requestBuilder.delete();
break;
default:
- requestBuilder = requestBuilder.method(method, null);
+ requestBuilder.method(method, null);
}
String userAgent = trimToEmpty(context.getProperty(PROP_USERAGENT).evaluateAttributeExpressions(requestFlowFile).getValue());
requestBuilder.addHeader("User-Agent", userAgent);
- requestBuilder = setHeaderProperties(context, requestBuilder, requestFlowFile);
+ setHeaderProperties(context, requestBuilder, requestFlowFile);
return requestBuilder.build();
}
@@ -1089,7 +1054,7 @@ public class InvokeHTTP extends AbstractProcessor {
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(entry.getKey().getName());
if (matcher.matches()) {
- propertyDescriptors.put(matcher.group("formDataName"), entry.getKey());
+ propertyDescriptors.put(matcher.group(FORM_DATA_NAME_GROUP), entry.getKey());
}
}
@@ -1101,7 +1066,7 @@ public class InvokeHTTP extends AbstractProcessor {
}
@Override
- public void writeTo(BufferedSink sink) throws IOException {
+ public void writeTo(BufferedSink sink) {
session.exportTo(requestFlowFile, sink.outputStream());
}
@@ -1132,31 +1097,25 @@ public class InvokeHTTP extends AbstractProcessor {
} else if (sendBody) {
return requestBody;
}
- return RequestBody.create(null, new byte[0]);
+ return RequestBody.create(new byte[0], null);
}
- private Request.Builder setHeaderProperties(final ProcessContext context, Request.Builder requestBuilder, final FlowFile requestFlowFile) {
+ private void setHeaderProperties(final ProcessContext context, final Request.Builder requestBuilder, final FlowFile requestFlowFile) {
// check if we should send the a Date header with the request
if (context.getProperty(PROP_DATE_HEADER).asBoolean()) {
- requestBuilder = requestBuilder.addHeader("Date", DATE_FORMAT.print(System.currentTimeMillis()));
+ final ZonedDateTime universalCoordinatedTimeNow = ZonedDateTime.now(ZoneOffset.UTC);
+ requestBuilder.addHeader("Date", RFC_2616_DATE_TIME.format(universalCoordinatedTimeNow));
}
- final ComponentLog logger = getLogger();
for (String headerKey : dynamicPropertyNames) {
String headerValue = context.getProperty(headerKey).evaluateAttributeExpressions(requestFlowFile).getValue();
- // don't include any of the excluded headers, log instead
- if (excludedHeaders.containsKey(headerKey)) {
- logger.warn(excludedHeaders.get(headerKey), new Object[]{headerKey});
- continue;
- }
-
// don't include dynamic form data properties
if (DYNAMIC_FORM_PARAMETER_NAME.matcher(headerKey).matches()) {
continue;
}
- requestBuilder = requestBuilder.addHeader(headerKey, headerValue);
+ requestBuilder.addHeader(headerKey, headerValue);
}
// iterate through the flowfile attributes, adding any attribute that
@@ -1178,11 +1137,10 @@ public class InvokeHTTP extends AbstractProcessor {
m.reset(headerKey);
if (m.matches()) {
String headerVal = trimToEmpty(entry.getValue());
- requestBuilder = requestBuilder.addHeader(headerKey, headerVal);
+ requestBuilder.addHeader(headerKey, headerVal);
}
}
}
- return requestBuilder;
}
@@ -1235,95 +1193,59 @@ public class InvokeHTTP extends AbstractProcessor {
private void logRequest(ComponentLog logger, Request request) {
logger.debug("\nRequest to remote service:\n\t{}\n{}",
- new Object[]{request.url().url().toExternalForm(), getLogString(request.headers().toMultimap())});
+ request.url().url().toExternalForm(), getLogString(request.headers().toMultimap()));
}
private void logResponse(ComponentLog logger, URL url, Response response) {
logger.debug("\nResponse from remote service:\n\t{}\n{}",
- new Object[]{url.toExternalForm(), getLogString(response.headers().toMultimap())});
+ url.toExternalForm(), getLogString(response.headers().toMultimap()));
}
private String getLogString(Map<String, List<String>> map) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, List<String>> entry : map.entrySet()) {
List<String> list = entry.getValue();
- if (list.isEmpty()) {
- continue;
- }
- sb.append("\t");
- sb.append(entry.getKey());
- sb.append(": ");
- if (list.size() == 1) {
- sb.append(list.get(0));
- } else {
- sb.append(list.toString());
+ if (!list.isEmpty()) {
+ sb.append("\t");
+ sb.append(entry.getKey());
+ sb.append(": ");
+ if (list.size() == 1) {
+ sb.append(list.get(0));
+ } else {
+ sb.append(list.toString());
+ }
+ sb.append("\n");
}
- sb.append("\n");
}
return sb.toString();
}
/**
- * Convert a collection of string values into a overly simple comma separated string.
- * <p/>
- * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style.
- */
- private String csv(Collection<String> values) {
- if (values == null || values.isEmpty()) {
- return "";
- }
- if (values.size() == 1) {
- return values.iterator().next();
- }
-
- StringBuilder sb = new StringBuilder();
- for (String value : values) {
- value = value.trim();
- if (value.isEmpty()) {
- continue;
- }
- if (sb.length() > 0) {
- sb.append(", ");
- }
- sb.append(value);
- }
- return sb.toString().trim();
- }
-
- /**
* Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
*/
- private Map<String, String> convertAttributesFromHeaders(URL url, Response responseHttp) {
+ private Map<String, String> convertAttributesFromHeaders(final Response responseHttp) {
// create a new hashmap to store the values from the connection
- Map<String, String> map = new HashMap<>();
- responseHttp.headers().names().forEach((key) -> {
- if (key == null) {
- return;
- }
-
- List<String> values = responseHttp.headers().values(key);
-
+ final Map<String, String> attributes = new HashMap<>();
+ final Headers headers = responseHttp.headers();
+ headers.names().forEach((key) -> {
+ final List<String> values = headers.values(key);
// we ignore any headers with no actual values (rare)
- if (values == null || values.isEmpty()) {
- return;
+ if (!values.isEmpty()) {
+ // create a comma separated string from the values, this is stored in the map
+ final String value = StringUtils.join(values, MULTIPLE_HEADER_DELIMITER);
+ attributes.put(key, value);
}
-
- // create a comma separated string from the values, this is stored in the map
- String value = csv(values);
-
- // put the csv into the map
- map.put(key, value);
});
- if (responseHttp.request().isHttps()) {
- Principal principal = responseHttp.handshake().peerPrincipal();
-
+ final Handshake handshake = responseHttp.handshake();
+ if (handshake != null) {
+ final Principal principal = handshake.peerPrincipal();
if (principal != null) {
- map.put(REMOTE_DN, principal.getName());
+ attributes.put(REMOTE_DN, principal.getName());
}
}
- return map;
+ return attributes;
}
private Charset getCharsetFromMediaType(MediaType contentType) {
@@ -1339,26 +1261,7 @@ public class InvokeHTTP extends AbstractProcessor {
*
* @return the directory in which the ETag cache should be written
*/
- private static File getETagCacheDir() {
- return Files.createTempDir();
- }
-
- private static class OverrideHostnameVerifier implements HostnameVerifier {
-
- private final String trustedHostname;
- private final HostnameVerifier delegate;
-
- private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) {
- this.trustedHostname = trustedHostname;
- this.delegate = delegate;
- }
-
- @Override
- public boolean verify(String hostname, SSLSession session) {
- if (trustedHostname.equalsIgnoreCase(hostname)) {
- return true;
- }
- return delegate.verify(hostname, session);
- }
+ private static File getETagCacheDir() throws IOException {
+ return Files.createTempDirectory(InvokeHTTP.class.getSimpleName()).toFile();
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java
new file mode 100644
index 0000000..ed60215
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/InvokeHTTPTest.java
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.security.util.StandardTlsConfiguration;
+import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.security.util.TlsException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.LogMessage;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_MOVED_TEMP;
+import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
+import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class InvokeHTTPTest {
+ private static final String HTTP_LOCALHOST_URL = "http://localhost";
+
+ private static final String LOCALHOST = "localhost";
+
+ private static final String BASE_PATH = "/";
+
+ private static final String POST_FORM_PARAMETER_KEY = "post:form:parameter";
+
+ private static final String DATE_HEADER = "Date";
+
+ private static final String ACCEPT_HEADER = "Accept";
+
+ private static final String AUTHORIZATION_HEADER = "Authorization";
+
+ private static final String CONTENT_LENGTH_HEADER = "Content-Length";
+
+ private static final String CONTENT_TYPE_HEADER = "Content-Type";
+
+ private static final String LOCATION_HEADER = "Location";
+
+ private static final String TRANSFER_ENCODING_HEADER = "Transfer-Encoding";
+
+ private static final String USER_AGENT_HEADER = "User-Agent";
+
+ private static final String AUTHENTICATE_HEADER = "WWW-Authenticate";
+
+ private static final String REPEATED_HEADER = "Repeated";
+
+ private static final String GET_METHOD = "GET";
+
+ private static final String DELETE_METHOD = "DELETE";
+
+ private static final String HEAD_METHOD = "HEAD";
+
+ private static final String OPTIONS_METHOD = "OPTIONS";
+
+ private static final String POST_METHOD = "POST";
+
+ private static final String PATCH_METHOD = "PATCH";
+
+ private static final String PUT_METHOD = "PUT";
+
+ private static final String TEXT_PLAIN = "text/plain";
+
+ private static final String FLOW_FILE_CONTENT = String.class.getName();
+
+ private static final int TAKE_REQUEST_COMPLETED_TIMEOUT = 1;
+
+ private static final String TLS_CONNECTION_TIMEOUT = "60 s";
+
+ private static TlsConfiguration generatedTlsConfiguration;
+
+ private static TlsConfiguration truststoreTlsConfiguration;
+
+ private MockWebServer mockWebServer;
+
+ private TestRunner runner;
+
+ @BeforeClass
+ public static void setStores() throws IOException, GeneralSecurityException {
+ generatedTlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
+ truststoreTlsConfiguration = new StandardTlsConfiguration(
+ null,
+ null,
+ null,
+ generatedTlsConfiguration.getTruststorePath(),
+ generatedTlsConfiguration.getTruststorePassword(),
+ generatedTlsConfiguration.getTruststoreType()
+ );
+ }
+
+ @AfterClass
+ public static void deleteStores() throws IOException {
+ Files.deleteIfExists(Paths.get(generatedTlsConfiguration.getKeystorePath()));
+ Files.deleteIfExists(Paths.get(generatedTlsConfiguration.getTruststorePath()));
+ }
+
+ @Before
+ public void setRunner() {
+ mockWebServer = new MockWebServer();
+ runner = TestRunners.newTestRunner(new InvokeHTTP());
+ // Disable Connection Pooling
+ runner.setProperty(InvokeHTTP.PROP_MAX_IDLE_CONNECTIONS, Integer.toString(0));
+ }
+
+ @After
+ public void shutdownServer() throws IOException {
+ mockWebServer.shutdown();
+ }
+
+ @Test
+ public void testNotValidWithDefaultProperties() {
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNotValidWithProxyTypeInvalid() {
+ runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
+ runner.setProperty(InvokeHTTP.PROP_PROXY_TYPE, String.class.getSimpleName());
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNotValidWithProxyHostWithoutProxyPort() {
+ runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
+ runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, String.class.getSimpleName());
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNotValidWithProxyUserWithoutProxyPassword() {
+ runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
+ runner.setProperty(InvokeHTTP.PROP_PROXY_USER, String.class.getSimpleName());
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNotValidWithProxyUserAndPasswordWithoutProxyHost() {
+ runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
+ runner.setProperty(InvokeHTTP.PROP_PROXY_USER, String.class.getSimpleName());
+ runner.setProperty(InvokeHTTP.PROP_PROXY_PASSWORD, String.class.getSimpleName());
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNotValidWithHttpsProxyTypeWithoutSslContextService() {
+ runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
+ runner.setProperty(InvokeHTTP.PROP_PROXY_TYPE, InvokeHTTP.HTTPS);
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNotValidWithPostFormPropertyWithoutFormBodyFormName() {
+ runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
+ runner.setProperty(POST_FORM_PARAMETER_KEY, String.class.getSimpleName());
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNotValidWithPostFormPropertyAndFormBodyFormNameWithoutSendBodyEnabled() {
+ runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
+ runner.setProperty(POST_FORM_PARAMETER_KEY, String.class.getSimpleName());
+ runner.setProperty(InvokeHTTP.PROP_FORM_BODY_FORM_NAME, String.class.getSimpleName());
+ runner.setProperty(InvokeHTTP.PROP_SEND_BODY, Boolean.FALSE.toString());
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testValidWithMinimumProperties() {
+ runner.setProperty(InvokeHTTP.PROP_URL, HTTP_LOCALHOST_URL);
+ runner.assertValid();
+ }
+
+ @Test
+ public void testRunNoIncomingConnectionsWithNonLoopConnections() {
+ runner.setIncomingConnection(false);
+ runner.setNonLoopConnection(true);
+ setUrlProperty();
+
+ runner.run();
+ runner.assertQueueEmpty();
+ }
+
+ @Test
+ public void testRunNoIncomingConnectionsPostMethod() {
+ runner.setIncomingConnection(false);
+ runner.setNonLoopConnection(false);
+ setUrlProperty();
+ runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
+
+ runner.run();
+ runner.assertQueueEmpty();
+ }
+
+ @Test
+ public void testRunGetMalformedUrlExceptionFailureNoIncomingConnections() {
+ runner.setIncomingConnection(false);
+ runner.setNonLoopConnection(false);
+
+ runner.setProperty(InvokeHTTP.PROP_URL, "${file.name}");
+
+ runner.run();
+
+ final List<LogMessage> errorMessages = runner.getLogger().getErrorMessages();
+ assertFalse(errorMessages.isEmpty());
+ }
+
+ @Test
+ public void testRunGetMalformedUrlExceptionFailure() {
+ final String urlAttributeKey = "request.url";
+ runner.setProperty(InvokeHTTP.PROP_URL, String.format("${%s}", urlAttributeKey));
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(urlAttributeKey, String.class.getSimpleName());
+ runner.enqueue(FLOW_FILE_CONTENT, attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(InvokeHTTP.REL_FAILURE);
+ runner.assertPenalizeCount(1);
+
+ final MockFlowFile flowFile = getFailureFlowFile();
+ flowFile.assertAttributeEquals(InvokeHTTP.EXCEPTION_CLASS, MalformedURLException.class.getName());
+ flowFile.assertAttributeExists(InvokeHTTP.EXCEPTION_MESSAGE);
+ }
+
+ @Test
+ public void testRunGetMethodIllegalArgumentExceptionFailure() {
+ setUrlProperty();
+ final String methodAttributeKey = "request.method";
+ runner.setProperty(InvokeHTTP.PROP_METHOD, String.format("${%s}", methodAttributeKey));
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(methodAttributeKey, null);
+ runner.enqueue(FLOW_FILE_CONTENT, attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(InvokeHTTP.REL_FAILURE);
+ runner.assertPenalizeCount(1);
+
+ final MockFlowFile flowFile = getFailureFlowFile();
+ flowFile.assertAttributeEquals(InvokeHTTP.EXCEPTION_CLASS, IllegalArgumentException.class.getName());
+ flowFile.assertAttributeExists(InvokeHTTP.EXCEPTION_MESSAGE);
+ }
+
+ @Test
+ public void testRunGetHttp200Success() throws InterruptedException {
+ assertRequestMethodSuccess(GET_METHOD);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessIgnoreResponseContentEnabled() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.IGNORE_RESPONSE_CONTENT, Boolean.TRUE.toString());
+ assertRequestMethodSuccess(GET_METHOD);
+
+ final MockFlowFile responseFlowFile = getResponseFlowFile();
+ assertEquals(StringUtils.EMPTY, responseFlowFile.getContent());
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessOutputBodyAttribute() {
+ final String outputAttributeKey = String.class.getSimpleName();
+ runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE, outputAttributeKey);
+ setUrlProperty();
+
+ final String body = String.class.getName();
+ mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK).setBody(body));
+ runner.enqueue(FLOW_FILE_CONTENT);
+ runner.run();
+
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_SUCCESS_REQ, HTTP_OK);
+
+ final MockFlowFile flowFile = getRequestFlowFile();
+ flowFile.assertAttributeEquals(outputAttributeKey, body);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessOutputBodyAttributeNoIncomingConnections() {
+ final String outputAttributeKey = String.class.getSimpleName();
+ runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE, outputAttributeKey);
+ setUrlProperty();
+ runner.setIncomingConnection(false);
+ runner.setNonLoopConnection(false);
+
+ final String body = String.class.getName();
+ mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK).setBody(body));
+ runner.run();
+
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_SUCCESS_REQ, HTTP_OK);
+
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).iterator().next();
+ flowFile.assertAttributeEquals(outputAttributeKey, body);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessNoIncomingConnections() {
+ runner.setIncomingConnection(false);
+ runner.setNonLoopConnection(false);
+ setUrlProperty();
+
+ mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
+ runner.run();
+
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessProxyHostPortConfigured() throws InterruptedException {
+ final String mockWebServerUrl = getMockWebServerUrl();
+ final URI uri = URI.create(mockWebServerUrl);
+
+ runner.setProperty(InvokeHTTP.PROP_URL, mockWebServerUrl);
+ runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, uri.getHost());
+ runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, Integer.toString(uri.getPort()));
+
+ mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
+ runner.enqueue(FLOW_FILE_CONTENT);
+ runner.run();
+
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+ final RecordedRequest request = takeRequestCompleted();
+ final String requestLine = request.getRequestLine();
+
+ final String proxyRequestLine = String.format("%s %s HTTP/1.1", GET_METHOD, mockWebServerUrl);
+ assertEquals(proxyRequestLine, requestLine);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessProxyHostPortUserPasswordConfigured() throws InterruptedException {
+ final String mockWebServerUrl = getMockWebServerUrl();
+ final URI uri = URI.create(mockWebServerUrl);
+
+ runner.setProperty(InvokeHTTP.PROP_URL, mockWebServerUrl);
+ runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, uri.getHost());
+ runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, Integer.toString(uri.getPort()));
+ runner.setProperty(InvokeHTTP.PROP_PROXY_USER, String.class.getSimpleName());
+ runner.setProperty(InvokeHTTP.PROP_PROXY_PASSWORD, String.class.getName());
+
+ mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
+ runner.enqueue(FLOW_FILE_CONTENT);
+ runner.run();
+
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+ final RecordedRequest request = takeRequestCompleted();
+ final String requestLine = request.getRequestLine();
+
+ final String proxyRequestLine = String.format("%s %s HTTP/1.1", GET_METHOD, mockWebServerUrl);
+ assertEquals(proxyRequestLine, requestLine);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessContentTypeHeaderMimeType() {
+ final MockResponse response = new MockResponse().setResponseCode(HTTP_OK).setHeader(CONTENT_TYPE_HEADER, TEXT_PLAIN);
+ mockWebServer.enqueue(response);
+
+ setUrlProperty();
+ runner.enqueue(FLOW_FILE_CONTENT);
+ runner.run();
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final MockFlowFile responseFlowFile = getResponseFlowFile();
+ responseFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), TEXT_PLAIN);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessRequestDateHeader() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_DATE_HEADER, StringUtils.capitalize(Boolean.TRUE.toString()));
+
+ enqueueResponseCodeAndRun(HTTP_OK);
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final RecordedRequest request = takeRequestCompleted();
+ final String dateHeader = request.getHeader(DATE_HEADER);
+ assertNotNull("Request Date not found", dateHeader);
+
+ final Pattern rfcDatePattern = Pattern.compile("^.+? \\d{4} \\d{2}:\\d{2}:\\d{2} GMT$");
+ assertTrue("Request Date RFC 2616 not matched", rfcDatePattern.matcher(dateHeader).matches());
+
+ final ZonedDateTime zonedDateTime = ZonedDateTime.parse(dateHeader, DateTimeFormatter.RFC_1123_DATE_TIME);
+ assertNotNull("Request Date Parsing Failed", zonedDateTime);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessSendAttributesAndDynamicProperties() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, String.format("^%s$", ACCEPT_HEADER));
+ final String defaultContentTypeHeader = "Default-Content-Type";
+ runner.setProperty(defaultContentTypeHeader, InvokeHTTP.DEFAULT_CONTENT_TYPE);
+ setUrlProperty();
+ mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ACCEPT_HEADER, TEXT_PLAIN);
+ runner.enqueue(FLOW_FILE_CONTENT, attributes);
+ runner.run();
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final RecordedRequest request = takeRequestCompleted();
+ final String acceptHeader = request.getHeader(ACCEPT_HEADER);
+ assertEquals(TEXT_PLAIN, acceptHeader);
+
+ final String contentType = request.getHeader(defaultContentTypeHeader);
+ assertEquals(InvokeHTTP.DEFAULT_CONTENT_TYPE, contentType);
+
+ runner.removeProperty(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND);
+ runner.removeProperty(defaultContentTypeHeader);
+ mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
+ runner.enqueue(FLOW_FILE_CONTENT, attributes);
+ runner.run();
+
+ final RecordedRequest secondRequest = takeRequestCompleted();
+ assertNull("Accept Header found", secondRequest.getHeader(ACCEPT_HEADER));
+ assertNull("Default-Content-Type Header found", secondRequest.getHeader(defaultContentTypeHeader));
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessResponseHeaderRequestFlowFileAttributes() {
+ setUrlProperty();
+ runner.setProperty(InvokeHTTP.PROP_ADD_HEADERS_TO_REQUEST, Boolean.TRUE.toString());
+
+ final String firstHeader = String.class.getSimpleName();
+ final String secondHeader = Integer.class.getSimpleName();
+ final MockResponse response = new MockResponse()
+ .setResponseCode(HTTP_OK)
+ .addHeader(REPEATED_HEADER, firstHeader)
+ .addHeader(REPEATED_HEADER, secondHeader);
+
+ mockWebServer.enqueue(response);
+ runner.enqueue(FLOW_FILE_CONTENT);
+ runner.run();
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final MockFlowFile requestFlowFile = getRequestFlowFile();
+ requestFlowFile.assertAttributeEquals(CONTENT_LENGTH_HEADER, Integer.toString(0));
+
+ final String repeatedHeaders = String.format("%s, %s", firstHeader, secondHeader);
+ requestFlowFile.assertAttributeEquals(REPEATED_HEADER, repeatedHeaders);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessCacheTagEnabled() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_USE_ETAG, Boolean.TRUE.toString());
+
+ assertRequestMethodSuccess(GET_METHOD);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessBasicAuthentication() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, String.class.getSimpleName());
+ runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, String.class.getName());
+
+ enqueueResponseCodeAndRun(HTTP_OK);
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final RecordedRequest request = takeRequestCompleted();
+ final String authorization = request.getHeader(AUTHORIZATION_HEADER);
+ assertNotNull("Authorization Header not found", authorization);
+
+ final Pattern basicAuthPattern = Pattern.compile("^Basic [^\\s]+$");
+ assertTrue("Basic Authentication not matched", basicAuthPattern.matcher(authorization).matches());
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessDigestAuthentication() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, String.class.getSimpleName());
+ runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, String.class.getName());
+ runner.setProperty(InvokeHTTP.PROP_DIGEST_AUTH, Boolean.TRUE.toString());
+
+ final String realm = UUID.randomUUID().toString();
+ final String nonce = UUID.randomUUID().toString();
+ final String digestHeader = String.format("Digest realm=\"%s\", nonce=\"%s\"", realm, nonce);
+
+ mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_UNAUTHORIZED).setHeader(AUTHENTICATE_HEADER, digestHeader));
+ enqueueResponseCodeAndRun(HTTP_OK);
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final RecordedRequest request = takeRequestCompleted();
+ assertNull("Authorization Header not found", request.getHeader(AUTHORIZATION_HEADER));
+
+ final RecordedRequest authenticatedRequest = takeRequestCompleted();
+ final String authorization = authenticatedRequest.getHeader(AUTHORIZATION_HEADER);
+ assertNotNull("Authorization Header not found", authorization);
+ assertTrue("Digest Realm not found", authorization.contains(realm));
+ assertTrue("Digest Nonce not found", authorization.contains(nonce));
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessSslContextServiceServerTrusted() throws InitializationException, GeneralSecurityException {
+ assertResponseSuccessSslContextConfigured(generatedTlsConfiguration, truststoreTlsConfiguration);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessSslContextServiceMutualTrusted() throws InitializationException, GeneralSecurityException {
+ assertResponseSuccessSslContextConfigured(generatedTlsConfiguration, generatedTlsConfiguration);
+ }
+
+ @Test
+ public void testRunGetSslContextServiceMutualTrustedClientCertificateMissing() throws InitializationException, GeneralSecurityException {
+ runner.setProperty(InvokeHTTP.DISABLE_HTTP2_PROTOCOL, StringUtils.capitalize(Boolean.TRUE.toString()));
+ setSslContextConfiguration(generatedTlsConfiguration, truststoreTlsConfiguration);
+ mockWebServer.requireClientAuth();
+
+ setUrlProperty();
+ runner.enqueue(FLOW_FILE_CONTENT);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(InvokeHTTP.REL_FAILURE);
+ final MockFlowFile flowFile = getFailureFlowFile();
+ flowFile.assertAttributeExists(InvokeHTTP.EXCEPTION_CLASS);
+ flowFile.assertAttributeExists(InvokeHTTP.EXCEPTION_MESSAGE);
+ }
+
+ @Test
+ public void testRunGetHttp200SuccessUserAgentConfigured() throws InterruptedException {
+ final String userAgent = UUID.randomUUID().toString();
+ runner.setProperty(InvokeHTTP.PROP_USERAGENT, userAgent);
+
+ enqueueResponseCodeAndRun(HTTP_OK);
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final RecordedRequest request = takeRequestCompleted();
+ final String userAgentHeader = request.getHeader(USER_AGENT_HEADER);
+ assertEquals(userAgent, userAgentHeader);
+ }
+
+ @Test
+ public void testRunGetHttp302NoRetryFollowRedirectsDefaultEnabled() {
+ mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_MOVED_TEMP).setHeader(LOCATION_HEADER, getMockWebServerUrl()));
+ enqueueResponseCodeAndRun(HTTP_OK);
+
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+ }
+
+ @Test
+ public void testRunGetHttp302NoRetryFollowRedirectsDisabled() {
+ runner.setProperty(InvokeHTTP.PROP_FOLLOW_REDIRECTS, StringUtils.capitalize(Boolean.FALSE.toString()));
+ enqueueResponseCodeAndRun(HTTP_MOVED_TEMP);
+
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_NO_RETRY, HTTP_MOVED_TEMP);
+ }
+
+ @Test
+ public void testRunGetHttp400NoRetryMinimumProperties() {
+ enqueueResponseCodeAndRun(HTTP_BAD_REQUEST);
+
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_NO_RETRY, HTTP_BAD_REQUEST);
+ }
+
+ @Test
+ public void testRunGetHttp400NoRetryPenalizeNoRetry() {
+ runner.setProperty(InvokeHTTP.PROP_PENALIZE_NO_RETRY, Boolean.TRUE.toString());
+
+ enqueueResponseCodeAndRun(HTTP_BAD_REQUEST);
+
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertPenalizeCount(1);
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_NO_RETRY, HTTP_BAD_REQUEST);
+ }
+
+ @Test
+ public void testRunGetHttp500RetryMinimumProperties() {
+ enqueueResponseCodeAndRun(HTTP_INTERNAL_ERROR);
+
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RETRY, HTTP_INTERNAL_ERROR);
+ }
+
+ @Test
+ public void testRunGetHttp500RetryOutputResponseRegardless() {
+ runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS, Boolean.TRUE.toString());
+
+ enqueueResponseCodeAndRun(HTTP_INTERNAL_ERROR);
+
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RETRY, HTTP_INTERNAL_ERROR);
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_INTERNAL_ERROR);
+ }
+
+ @Test
+ public void testRunDeleteHttp200Success() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_METHOD, DELETE_METHOD);
+ assertRequestMethodSuccess(DELETE_METHOD);
+ }
+
+ @Test
+ public void testRunHeadHttp200Success() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_METHOD, HEAD_METHOD);
+ assertRequestMethodSuccess(HEAD_METHOD);
+ }
+
+ @Test
+ public void testRunOptionsHttp200Success() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_METHOD, OPTIONS_METHOD);
+ assertRequestMethodSuccess(OPTIONS_METHOD);
+ }
+
+ @Test
+ public void testRunPatchHttp200Success() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_METHOD, PATCH_METHOD);
+ assertRequestMethodSuccess(PATCH_METHOD);
+ }
+
+ @Test
+ public void testRunPostHttp200Success() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
+ assertRequestMethodSuccess(POST_METHOD);
+ }
+
+ @Test
+ public void testRunPostHttp200SuccessChunkedEncoding() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
+ runner.setProperty(InvokeHTTP.PROP_USE_CHUNKED_ENCODING, Boolean.TRUE.toString());
+
+ enqueueResponseCodeAndRun(HTTP_OK);
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final RecordedRequest request = takeRequestCompleted();
+ final String contentLength = request.getHeader(CONTENT_LENGTH_HEADER);
+ assertNull("Content-Length Request Header found", contentLength);
+
+ final String transferEncoding = request.getHeader(TRANSFER_ENCODING_HEADER);
+ assertEquals("chunked", transferEncoding);
+ }
+
+ @Test
+ public void testRunPostHttp200SuccessFormData() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
+
+ final String formName = "multipart-form";
+ runner.setProperty(InvokeHTTP.PROP_FORM_BODY_FORM_NAME, formName);
+
+ final String formDataParameter = String.class.getName();
+ final String formDataParameterName = "label";
+ final String formDataPropertyName = String.format("%s:%s", InvokeHTTP.FORM_BASE, formDataParameterName);
+ runner.setProperty(formDataPropertyName, formDataParameter);
+
+ setUrlProperty();
+ mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
+ runner.enqueue(FLOW_FILE_CONTENT);
+ runner.run();
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final RecordedRequest request = takeRequestCompleted();
+ final String contentType = request.getHeader(CONTENT_TYPE_HEADER);
+ assertNotNull("Content Type not found", contentType);
+
+ final Pattern multipartPattern = Pattern.compile("^multipart/form-data.+$");
+ assertTrue("Content Type not matched", multipartPattern.matcher(contentType).matches());
+
+ final String body = request.getBody().readUtf8();
+ assertTrue("Form Data Parameter not found", body.contains(formDataParameter));
+ }
+
+ @Test
+ public void testRunPutHttp200Success() throws InterruptedException {
+ runner.setProperty(InvokeHTTP.PROP_METHOD, PUT_METHOD);
+ assertRequestMethodSuccess(PUT_METHOD);
+ }
+
+ private void setUrlProperty() {
+ runner.setProperty(InvokeHTTP.PROP_URL, getMockWebServerUrl());
+ }
+
+ private String getMockWebServerUrl() {
+ return mockWebServer.url(BASE_PATH).newBuilder().host(LOCALHOST).build().toString();
+ }
+
+ private void enqueueResponseCodeAndRun(final int responseCode) {
+ setUrlProperty();
+ mockWebServer.enqueue(new MockResponse().setResponseCode(responseCode));
+ runner.enqueue(FLOW_FILE_CONTENT);
+ runner.run();
+ }
+
+ private RecordedRequest takeRequestCompleted() throws InterruptedException {
+ final RecordedRequest request = mockWebServer.takeRequest(TAKE_REQUEST_COMPLETED_TIMEOUT, TimeUnit.SECONDS);
+ assertNotNull("Request not found", request);
+ return request;
+ }
+
+ private MockFlowFile getFailureFlowFile() {
+ return runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).iterator().next();
+ }
+
+ private MockFlowFile getRequestFlowFile() {
+ return runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).iterator().next();
+ }
+
+ private MockFlowFile getResponseFlowFile() {
+ return runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).iterator().next();
+ }
+
+ private void assertRequestMethodSuccess(final String method) throws InterruptedException {
+ enqueueResponseCodeAndRun(HTTP_OK);
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final RecordedRequest request = takeRequestCompleted();
+ assertEquals(method, request.getMethod());
+ }
+
+ private void assertRelationshipStatusCodeEquals(final Relationship relationship, final int statusCode) {
+ final List<MockFlowFile> responseFlowFiles = runner.getFlowFilesForRelationship(relationship);
+ final String message = String.format("FlowFiles not found for Relationship [%s]", relationship);
+ assertFalse(message, responseFlowFiles.isEmpty());
+ final MockFlowFile responseFlowFile = responseFlowFiles.iterator().next();
+ assertStatusCodeEquals(responseFlowFile, statusCode);
+ }
+
+ private void assertStatusCodeEquals(final MockFlowFile flowFile, final int statusCode) {
+ flowFile.assertAttributeEquals(InvokeHTTP.STATUS_CODE, Integer.toString(statusCode));
+ flowFile.assertAttributeExists(InvokeHTTP.STATUS_MESSAGE);
+ flowFile.assertAttributeExists(InvokeHTTP.TRANSACTION_ID);
+ flowFile.assertAttributeExists(InvokeHTTP.REQUEST_URL);
+ }
+
+ private void assertResponseSuccessRelationships() {
+ final List<LogMessage> errorMessages = runner.getLogger().getErrorMessages();
+ final Optional<LogMessage> errorMessage = errorMessages.stream().findFirst();
+ if (errorMessage.isPresent()) {
+ final String message = String.format("Error Message Logged: %s", errorMessage.get().getMsg());
+ assertFalse(message, errorMessages.isEmpty());
+ }
+
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ }
+
+ private void assertResponseSuccessSslContextConfigured(final TlsConfiguration serverTlsConfiguration, final TlsConfiguration clientTlsConfiguration) throws InitializationException, TlsException {
+ setSslContextConfiguration(serverTlsConfiguration, clientTlsConfiguration);
+ enqueueResponseCodeAndRun(HTTP_OK);
+
+ assertResponseSuccessRelationships();
+ assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
+
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).iterator().next();
+ flowFile.assertAttributeExists(InvokeHTTP.REMOTE_DN);
+ }
+
+ private void setSslContextConfiguration(final TlsConfiguration serverTlsConfiguration, final TlsConfiguration clientTlsConfiguration) throws InitializationException, TlsException {
+ final SSLContextService sslContextService = setSslContextService();
+ final SSLContext serverSslContext = getSslContext(serverTlsConfiguration);
+ setMockWebServerSslSocketFactory(serverSslContext);
+
+ final SSLContext clientSslContext = getSslContext(clientTlsConfiguration);
+ when(sslContextService.createContext()).thenReturn(clientSslContext);
+ when(sslContextService.createTlsConfiguration()).thenReturn(clientTlsConfiguration);
+ }
+
+ private SSLContextService setSslContextService() throws InitializationException {
+ final String serviceIdentifier = SSLContextService.class.getName();
+ final SSLContextService sslContextService = mock(SSLContextService.class);
+ when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
+
+ runner.addControllerService(serviceIdentifier, sslContextService);
+ runner.enableControllerService(sslContextService);
+ runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, serviceIdentifier);
+ runner.setProperty(InvokeHTTP.PROP_READ_TIMEOUT, TLS_CONNECTION_TIMEOUT);
+ runner.setProperty(InvokeHTTP.PROP_CONNECT_TIMEOUT, TLS_CONNECTION_TIMEOUT);
+ return sslContextService;
+ }
+
+ private void setMockWebServerSslSocketFactory(final SSLContext sslContext) {
+ final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+ if (sslSocketFactory == null) {
+ throw new IllegalArgumentException("Socket Factory not found");
+ }
+ mockWebServer.useHttps(sslSocketFactory, false);
+ }
+
+ private SSLContext getSslContext(final TlsConfiguration configuration) throws TlsException {
+ final SSLContext sslContext = SslContextFactory.createSslContext(configuration);
+ if (sslContext == null) {
+ throw new IllegalArgumentException("SSLContext not found for TLS Configuration");
+ }
+ return sslContext;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
deleted file mode 100644
index 0a46bbe..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.lang.reflect.Field;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import javax.net.ssl.SSLContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
-import org.apache.nifi.security.util.KeyStoreUtils;
-import org.apache.nifi.security.util.SslContextFactory;
-import org.apache.nifi.security.util.TlsConfiguration;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunners;
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class TestInvokeHTTP extends TestInvokeHttpCommon {
- private static final Logger logger = LoggerFactory.getLogger(TestInvokeHTTP.class);
-
- private static TlsConfiguration tlsConfiguration;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- // generate new keystore and truststore
- tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
- configureServer(null, null);
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- if (tlsConfiguration != null) {
- try {
- if (StringUtils.isNotBlank(tlsConfiguration.getKeystorePath())) {
- Files.deleteIfExists(Paths.get(tlsConfiguration.getKeystorePath()));
- }
- } catch (IOException e) {
- throw new IOException("There was an error deleting a keystore: " + e.getMessage(), e);
- }
-
- try {
- if (StringUtils.isNotBlank(tlsConfiguration.getTruststorePath())) {
- Files.deleteIfExists(Paths.get(tlsConfiguration.getTruststorePath()));
- }
- } catch (IOException e) {
- throw new IOException("There was an error deleting a truststore: " + e.getMessage(), e);
- }
- }
- }
-
- @Before
- public void before() throws Exception {
- runner = TestRunners.newTestRunner(InvokeHTTP.class);
- }
-
- @Test
- public void testSslSetHttpRequest() throws Exception {
- final String serviceIdentifier = SSLContextService.class.getName();
- final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
- Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
- final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
- Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
- Mockito.when(sslContextService.createTlsConfiguration()).thenReturn(tlsConfiguration);
-
- runner = TestRunners.newTestRunner(InvokeHTTP.class);
-
- runner.addControllerService(serviceIdentifier, sslContextService);
- runner.enableControllerService(sslContextService);
- runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, serviceIdentifier);
-
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request status.code and status.message
- // original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- // expected in response
- // status code, status message, all headers from server response --> ff attributes
- // server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("/status/200".getBytes(StandardCharsets.UTF_8));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- }
-
- // Currently InvokeHttp does not support Proxy via Https
- @Test
- public void testProxy() throws Exception {
- addHandler(new MyProxyHandler());
- URL proxyURL = new URL(url);
-
- runner.setVariable("proxy.host", proxyURL.getHost());
- runner.setVariable("proxy.port", String.valueOf(proxyURL.getPort()));
- runner.setVariable("proxy.username", "username");
- runner.setVariable("proxy.password", "password");
-
- runner.setProperty(InvokeHTTP.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out
- runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, "${proxy.host}");
-
- try {
- runner.run();
- Assert.fail();
- } catch (AssertionError e) {
- // Expect assertion error when proxy port isn't set but host is.
- }
- runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, "${proxy.port}");
-
- runner.setProperty(InvokeHTTP.PROP_PROXY_USER, "${proxy.username}");
-
- try {
- runner.run();
- Assert.fail();
- } catch (AssertionError e) {
- // Expect assertion error when proxy password isn't set but host is.
- }
- runner.setProperty(InvokeHTTP.PROP_PROXY_PASSWORD, "${proxy.password}");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- //expected in request status.code and status.message
- //original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes(StandardCharsets.UTF_8));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- //expected in response
- //status code, status message, all headers from server response --> ff attributes
- //server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("http://nifi.apache.org/".getBytes(StandardCharsets.UTF_8));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- }
-
- @Test
- public void testFailingHttpRequest() throws Exception {
-
- runner = TestRunners.newTestRunner(InvokeHTTP.class);
-
- // Remember: we expect that connecting to the following URL should raise a Java exception
- runner.setProperty(InvokeHTTP.PROP_URL, "http://127.0.0.1:0");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
- runner.assertPenalizeCount(1);
-
- // expected in request java.exception
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
- bundle.assertAttributeEquals(InvokeHTTP.EXCEPTION_CLASS, "java.lang.IllegalArgumentException");
-
- }
-
- public static class MyProxyHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
- baseRequest.setHandled(true);
-
- if ("Get".equalsIgnoreCase(request.getMethod())) {
- response.setStatus(200);
- String proxyPath = baseRequest.getHttpURI().toString();
- response.setContentLength(proxyPath.length());
- response.setContentType("text/plain");
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(proxyPath);
- writer.flush();
- }
- } else {
- response.setStatus(404);
- response.setContentType("text/plain");
- response.setContentLength(0);
- }
- }
- }
-
- @Test
- public void testOnPropertyModified() throws Exception {
- final InvokeHTTP processor = new InvokeHTTP();
- final Field regexAttributesToSendField = InvokeHTTP.class.getDeclaredField("regexAttributesToSend");
- regexAttributesToSendField.setAccessible(true);
-
- assertNull(regexAttributesToSendField.get(processor));
-
- // Set Attributes to Send.
- processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, null, "uuid");
- assertNotNull(regexAttributesToSendField.get(processor));
-
- // Null clear Attributes to Send. NIFI-1125: Throws NullPointerException.
- processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, "uuid", null);
- assertNull(regexAttributesToSendField.get(processor));
-
- // Set Attributes to Send.
- processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, null, "uuid");
- assertNotNull(regexAttributesToSendField.get(processor));
-
- // Clear Attributes to Send with empty string.
- processor.onPropertyModified(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, "uuid", "");
- assertNull(regexAttributesToSendField.get(processor));
-
- }
-
- @Test
- public void testEmptyGzipHttpReponse() throws Exception {
- addHandler(new EmptyGzipResponseHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url);
- runner.setProperty(InvokeHTTP.IGNORE_RESPONSE_CONTENT, "true");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- //expected empty content in response FlowFile
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle.assertContentEquals(new byte[0]);
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
- bundle.assertAttributeEquals("Content-Type", "text/plain");
- }
-
-
- @Test
- public void testShouldAllowExtension() {
- // Arrange
- class ExtendedInvokeHTTP extends InvokeHTTP {
- private final int extendedNumber;
-
- public ExtendedInvokeHTTP(int num) {
- super();
- this.extendedNumber = num;
- }
-
- public int extendedMethod() {
- return this.extendedNumber;
- }
- }
-
- int num = Double.valueOf(Math.random() * 100).intValue();
-
- // Act
- ExtendedInvokeHTTP eih = new ExtendedInvokeHTTP(num);
-
- // Assert
- assertEquals(num, eih.extendedMethod());
- }
-
- public static class EmptyGzipResponseHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) {
- baseRequest.setHandled(true);
- response.setStatus(200);
- response.setContentLength(0);
- response.setContentType("text/plain");
- response.setHeader("Content-Encoding", "gzip");
- }
-
- }
-
- @Test
- public void testShouldNotSendUserAgentByDefault() throws Exception {
- // Arrange
- addHandler(new EchoUserAgentHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url);
-
- createFlowFiles(runner);
-
- // Act
- runner.run();
-
- // Assert
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- final MockFlowFile response = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- String content = new String(response.toByteArray(), UTF_8);
- logger.info("Returned flowfile content: " + content);
- assertTrue(content.isEmpty());
-
- response.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- response.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- }
-
- @Test
- public void testShouldSetUserAgentExplicitly() throws Exception {
- addHandler(new EchoUserAgentHandler());
-
- runner.setProperty(InvokeHTTP.PROP_USERAGENT, "Apache NiFi For The Win");
- runner.setProperty(InvokeHTTP.PROP_URL, url);
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- final MockFlowFile response = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- String content = new String(response.toByteArray(), UTF_8);
- assertTrue(content.startsWith("Apache NiFi For The Win"));
-
- response.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- response.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- }
-
- @Test
- public void testShouldSetUserAgentWithExpressionLanguage() throws Exception {
- addHandler(new EchoUserAgentHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url);
- runner.setProperty(InvokeHTTP.PROP_USERAGENT, "${literal('And now for something completely different...')}");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- final MockFlowFile response = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- // One check to verify a custom value and that the expression language actually works.
- response.assertContentEquals("And now for something completely different...");
-
- response.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- response.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- }
-
- public static class EchoUserAgentHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
- baseRequest.setHandled(true);
-
- if ("Get".equalsIgnoreCase(request.getMethod())) {
- response.setStatus(200);
- String useragent = request.getHeader("User-agent");
- response.setContentLength(useragent.length());
- response.setContentType("text/plain");
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(useragent);
- writer.flush();
- }
- } else {
- response.setStatus(404);
- response.setContentType("text/plain");
- response.setContentLength(0);
- }
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
deleted file mode 100644
index 67c6e77..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import javax.net.ssl.SSLContext;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
-import org.apache.nifi.security.util.ClientAuth;
-import org.apache.nifi.security.util.KeyStoreUtils;
-import org.apache.nifi.security.util.SslContextFactory;
-import org.apache.nifi.security.util.StandardTlsConfiguration;
-import org.apache.nifi.security.util.TlsConfiguration;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.mockito.Mockito;
-
-/**
- * Executes the same tests as TestInvokeHttp but with one-way SSL enabled. The Jetty server created for these tests
- * will not require client certificates and the client will not use keystore properties in the SSLContextService.
- */
-public class TestInvokeHttpSSL extends TestInvokeHttpCommon {
-
- private static final String HTTP_CONNECT_TIMEOUT = "30 s";
- private static final String HTTP_READ_TIMEOUT = "30 s";
-
- protected static TlsConfiguration serverConfiguration;
-
- private static SSLContext truststoreSslContext;
- private static TlsConfiguration truststoreConfiguration;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- // generate new keystore and truststore
- serverConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
-
- truststoreConfiguration = new StandardTlsConfiguration(
- null,
- null,
- null,
- serverConfiguration.getTruststorePath(),
- serverConfiguration.getTruststorePassword(),
- serverConfiguration.getTruststoreType()
- );
-
- final SSLContext serverContext = SslContextFactory.createSslContext(serverConfiguration);
- configureServer(serverContext, ClientAuth.NONE);
- truststoreSslContext = SslContextFactory.createSslContext(truststoreConfiguration);
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- if (serverConfiguration != null) {
- try {
- if (StringUtils.isNotBlank(serverConfiguration.getKeystorePath())) {
- Files.deleteIfExists(Paths.get(serverConfiguration.getKeystorePath()));
- }
- } catch (IOException e) {
- throw new IOException("There was an error deleting a keystore: " + e.getMessage(), e);
- }
-
- try {
- if (StringUtils.isNotBlank(serverConfiguration.getTruststorePath())) {
- Files.deleteIfExists(Paths.get(serverConfiguration.getTruststorePath()));
- }
- } catch (IOException e) {
- throw new IOException("There was an error deleting a truststore: " + e.getMessage(), e);
- }
- }
- }
-
- @Before
- public void before() throws Exception {
- final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
- final String serviceIdentifier = SSLContextService.class.getName();
-
- Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
- Mockito.when(sslContextService.createContext()).thenReturn(getClientSslContext());
- Mockito.when(sslContextService.createTlsConfiguration()).thenReturn(getClientConfiguration());
-
- runner = TestRunners.newTestRunner(InvokeHTTP.class);
- runner.addControllerService(serviceIdentifier, sslContextService);
- runner.enableControllerService(sslContextService);
-
- runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, serviceIdentifier);
- runner.setProperty(InvokeHTTP.PROP_CONNECT_TIMEOUT, HTTP_CONNECT_TIMEOUT);
- runner.setProperty(InvokeHTTP.PROP_READ_TIMEOUT, HTTP_READ_TIMEOUT);
- }
-
- protected SSLContext getClientSslContext() {
- return truststoreSslContext;
- }
-
- protected TlsConfiguration getClientConfiguration() {
- return truststoreConfiguration;
- }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpTwoWaySSL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpTwoWaySSL.java
deleted file mode 100644
index a068437..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpTwoWaySSL.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import javax.net.ssl.SSLContext;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.security.util.ClientAuth;
-import org.apache.nifi.security.util.KeyStoreUtils;
-import org.apache.nifi.security.util.SslContextFactory;
-import org.apache.nifi.security.util.TlsConfiguration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-/**
- * This is probably overkill but in keeping with the same pattern as the TestInvokeHttp and TestInvokeHttpSSL class,
- * we will execute the same tests using two-way SSL. The Jetty server created for these tests will require client
- * certificates and the client will utilize keystore properties in the SSLContextService.
- */
-public class TestInvokeHttpTwoWaySSL extends TestInvokeHttpSSL {
-
- private static TlsConfiguration serverConfig;
- private static SSLContext clientSslContext;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- // generate new keystore and truststore
- serverConfig = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
-
- final SSLContext serverContext = SslContextFactory.createSslContext(serverConfig);
- configureServer(serverContext, ClientAuth.REQUIRED);
- clientSslContext = SslContextFactory.createSslContext(serverConfig);
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- if (serverConfig != null) {
- try {
- if (StringUtils.isNotBlank(serverConfig.getKeystorePath())) {
- Files.deleteIfExists(Paths.get(serverConfig.getKeystorePath()));
- }
- } catch (IOException e) {
- throw new IOException("There was an error deleting a keystore: " + e.getMessage(), e);
- }
-
- try {
- if (StringUtils.isNotBlank(serverConfig.getTruststorePath())) {
- Files.deleteIfExists(Paths.get(serverConfig.getTruststorePath()));
- }
- } catch (IOException e) {
- throw new IOException("There was an error deleting a truststore: " + e.getMessage(), e);
- }
- }
- }
-
- @Override
- protected SSLContext getClientSslContext() {
- return clientSslContext;
- }
-
- @Override
- protected TlsConfiguration getClientConfiguration() {
- return serverConfig;
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
index 22cf181..f56ccd0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java
@@ -17,8 +17,6 @@
package org.apache.nifi.processors.standard;
import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -28,12 +26,12 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.nio.file.Files;
-import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HttpsURLConnection;
@@ -58,7 +56,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.KeyStoreUtils;
-import org.apache.nifi.security.util.KeystoreType;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TlsConfiguration;
@@ -81,7 +78,6 @@ import org.mockito.Mockito;
import static org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.fail;
public class TestListenHTTP {
@@ -95,22 +91,12 @@ public class TestListenHTTP {
private final static String BASEPATH_VARIABLE = "HTTP_BASEPATH";
private final static String HTTP_SERVER_BASEPATH_EL = "${" + BASEPATH_VARIABLE + "}";
-
- private static final String KEYSTORE = "src/test/resources/keystore.jks";
- private static final String KEYSTORE_PASSWORD = "passwordpassword";
- private static final KeystoreType KEYSTORE_TYPE = KeystoreType.JKS;
- private static final String TRUSTSTORE = "src/test/resources/truststore.jks";
- private static final String TRUSTSTORE_PASSWORD = "passwordpassword";
- private static final KeystoreType TRUSTSTORE_TYPE = KeystoreType.JKS;
- private static final String CLIENT_KEYSTORE = "src/test/resources/client-keystore.p12";
- private static final KeystoreType CLIENT_KEYSTORE_TYPE = KeystoreType.PKCS12;
+ private static final String MULTIPART_ATTRIBUTE = "http.multipart.name";
private static final String TLS_1_3 = "TLSv1.3";
private static final String TLS_1_2 = "TLSv1.2";
private static final String LOCALHOST = "localhost";
- private static final long SEND_REQUEST_SLEEP = 150;
- private static final long RESPONSE_TIMEOUT = 1200000;
private static final int SOCKET_CONNECT_TIMEOUT = 100;
private static final long SERVER_START_TIMEOUT = 1200000;
@@ -217,9 +203,8 @@ public class TestListenHTTP {
}
@After
- public void teardown() {
+ public void shutdownServer() {
proc.shutdownHttpServer();
- new File("my-file-text.txt").delete();
}
@Test
@@ -484,15 +469,18 @@ public class TestListenHTTP {
return connection;
}
- private int executePOST(String message, boolean secure, boolean twoWaySsl) throws Exception {
+ private int postMessage(String message, boolean secure, boolean clientAuthRequired) throws Exception {
String endpointUrl = buildUrl(secure);
final URL url = new URL(endpointUrl);
- HttpURLConnection connection;
-
- if (secure) {
- connection = buildSecureConnection(twoWaySsl, url);
- } else {
- connection = (HttpURLConnection) url.openConnection();
+ final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+
+ if (connection instanceof HttpsURLConnection) {
+ final HttpsURLConnection httpsConnection = (HttpsURLConnection) connection;
+ if (clientAuthRequired) {
+ httpsConnection.setSSLSocketFactory(keyStoreSslContext.getSocketFactory());
+ } else {
+ httpsConnection.setSSLSocketFactory(trustStoreSslContext.getSocketFactory());
+ }
}
connection.setRequestMethod(HTTP_POST_METHOD);
connection.setDoOutput(true);
@@ -507,18 +495,6 @@ public class TestListenHTTP {
return connection.getResponseCode();
}
- private static HttpsURLConnection buildSecureConnection(boolean twoWaySsl, URL url) throws IOException {
- final HttpsURLConnection connection = (HttpsURLConnection) url.openConnection();
- if (twoWaySsl) {
- // Use a client certificate, do not reuse the server's keystore
- connection.setSSLSocketFactory(keyStoreSslContext.getSocketFactory());
- } else {
- // With one-way SSL, the client still needs a truststore
- connection.setSSLSocketFactory(trustStoreSslContext.getSocketFactory());
- }
- return connection;
- }
-
private String buildUrl(final boolean secure) {
return String.format("%s://localhost:%s/%s", secure ? "https" : "http", availablePort, HTTP_BASE_PATH);
}
@@ -572,40 +548,13 @@ public class TestListenHTTP {
}
}
- private void startWebServerAndSendRequests(Runnable sendRequestToWebserver, int numberOfExpectedFlowFiles) throws Exception {
+ private void startWebServerAndSendMessages(final List<String> messages, final int expectedStatusCode, final boolean secure, final boolean clientAuthRequired) throws Exception {
startWebServer();
- new Thread(sendRequestToWebserver).start();
- final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
- final ProcessContext context = runner.getProcessContext();
- int numTransferred = 0;
- long startTime = System.currentTimeMillis();
- while (numTransferred < numberOfExpectedFlowFiles && (System.currentTimeMillis() - startTime < RESPONSE_TIMEOUT)) {
- proc.onTrigger(context, processSessionFactory);
- numTransferred = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).size();
- Thread.sleep(SEND_REQUEST_SLEEP);
+ for (final String message : messages) {
+ final int statusCode = postMessage(message, secure, clientAuthRequired);
+ assertEquals("HTTP Status Code not matched", expectedStatusCode, statusCode);
}
-
- runner.assertTransferCount(ListenHTTP.RELATIONSHIP_SUCCESS, numberOfExpectedFlowFiles);
- }
-
- private void startWebServerAndSendMessages(final List<String> messages, int returnCode, boolean secure, boolean twoWaySsl)
- throws Exception {
-
- Runnable sendMessagesToWebServer = () -> {
- try {
- for (final String message : messages) {
- if (executePOST(message, secure, twoWaySsl) != returnCode) {
- fail("HTTP POST failed.");
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail("Not expecting error here.");
- }
- };
-
- startWebServerAndSendRequests(sendMessagesToWebServer, messages.size());
}
private void configureProcessorSslContextService(final ListenHTTP.ClientAuthentication clientAuthentication,
@@ -627,75 +576,66 @@ public class TestListenHTTP {
runner.enableControllerService(sslContextService);
}
-
@Test
- public void testMultipartFormDataRequest() throws Exception {
-
+ public void testMultipartFormDataRequest() throws IOException {
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_OK));
final SSLContextService sslContextService = runner.getControllerService(SSL_CONTEXT_SERVICE_IDENTIFIER, SSLContextService.class);
final boolean isSecure = (sslContextService != null);
+ startWebServer();
- Runnable sendRequestToWebserver = () -> {
- try {
- File file1 = createTextFile("my-file-text-", ".txt", "Hello", "World");
- File file2 = createTextFile("my-file-text-", ".txt", "{ \"name\":\"John\", \"age\":30 }");
-
- MultipartBody multipartBody = new MultipartBody.Builder().setType(MultipartBody.FORM)
- .addFormDataPart("p1", "v1")
- .addFormDataPart("p2", "v2")
- .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(MediaType.parse("text/plain"), file1))
- .addFormDataPart("file2", "my-file-data.json", RequestBody.create(MediaType.parse("application/json"), file2))
- .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+ File file1 = createTextFile("Hello", "World");
+ File file2 = createTextFile("{ \"name\":\"John\", \"age\":30 }");
+
+ MultipartBody multipartBody = new MultipartBody.Builder().setType(MultipartBody.FORM)
+ .addFormDataPart("p1", "v1")
+ .addFormDataPart("p2", "v2")
+ .addFormDataPart("file1", "my-file-text.txt", RequestBody.create(file1, MediaType.parse("text/plain")))
+ .addFormDataPart("file2", "my-file-data.json", RequestBody.create(file2, MediaType.parse("application/json")))
+ .addFormDataPart("file3", "my-file-binary.bin", RequestBody.create(generateRandomBinaryData(), MediaType.parse("application/octet-stream")))
+ .build();
+
+ Request request =
+ new Request.Builder()
+ .url(buildUrl(isSecure))
+ .post(multipartBody)
.build();
- Request request =
- new Request.Builder()
- .url(buildUrl(isSecure))
- .post(multipartBody)
- .build();
-
- int timeout = 3000;
- OkHttpClient client = new OkHttpClient.Builder()
- .readTimeout(timeout, TimeUnit.MILLISECONDS)
- .writeTimeout(timeout, TimeUnit.MILLISECONDS)
- .build();
+ int timeout = 3000;
+ OkHttpClient client = new OkHttpClient.Builder()
+ .readTimeout(timeout, TimeUnit.MILLISECONDS)
+ .writeTimeout(timeout, TimeUnit.MILLISECONDS)
+ .build();
- try (Response response = client.newCall(request).execute()) {
- Files.deleteIfExists(Paths.get(String.valueOf(file1)));
- Files.deleteIfExists(Paths.get(String.valueOf(file2)));
- Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body().string()), response.isSuccessful());
- }
- } catch (final Throwable t) {
- t.printStackTrace();
- Assert.fail(t.toString());
- }
- };
+ try (Response response = client.newCall(request).execute()) {
+ Files.deleteIfExists(Paths.get(String.valueOf(file1)));
+ Files.deleteIfExists(Paths.get(String.valueOf(file2)));
+ Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body()), response.isSuccessful());
+ }
- startWebServerAndSendRequests(sendRequestToWebserver, 5);
runner.assertAllFlowFilesTransferred(ListenHTTP.RELATIONSHIP_SUCCESS, 5);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(ListenHTTP.RELATIONSHIP_SUCCESS);
// Part fragments are not processed in the order we submitted them.
// We cannot rely on the order we sent them in.
- MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p1");
+ MockFlowFile mff = findFlowFile(flowFilesForRelationship, "p1");
mff.assertAttributeEquals("http.multipart.name", "p1");
mff.assertAttributeExists("http.multipart.size");
mff.assertAttributeEquals("http.multipart.fragments.sequence.number", "1");
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
mff.assertAttributeExists("http.headers.multipart.content-disposition");
- mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "p2");
+ mff = findFlowFile(flowFilesForRelationship, "p2");
mff.assertAttributeEquals("http.multipart.name", "p2");
mff.assertAttributeExists("http.multipart.size");
mff.assertAttributeExists("http.multipart.fragments.sequence.number");
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
mff.assertAttributeExists("http.headers.multipart.content-disposition");
- mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file1");
+ mff = findFlowFile(flowFilesForRelationship, "file1");
mff.assertAttributeEquals("http.multipart.name", "file1");
mff.assertAttributeEquals("http.multipart.filename", "my-file-text.txt");
mff.assertAttributeEquals("http.headers.multipart.content-type", "text/plain");
@@ -704,7 +644,7 @@ public class TestListenHTTP {
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
mff.assertAttributeExists("http.headers.multipart.content-disposition");
- mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file2");
+ mff = findFlowFile(flowFilesForRelationship, "file2");
mff.assertAttributeEquals("http.multipart.name", "file2");
mff.assertAttributeEquals("http.multipart.filename", "my-file-data.json");
mff.assertAttributeEquals("http.headers.multipart.content-type", "application/json");
@@ -713,7 +653,7 @@ public class TestListenHTTP {
mff.assertAttributeEquals("http.multipart.fragments.total.number", "5");
mff.assertAttributeExists("http.headers.multipart.content-disposition");
- mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", "file3");
+ mff = findFlowFile(flowFilesForRelationship, "file3");
mff.assertAttributeEquals("http.multipart.name", "file3");
mff.assertAttributeEquals("http.multipart.filename", "my-file-binary.bin");
mff.assertAttributeEquals("http.headers.multipart.content-type", "application/octet-stream");
@@ -723,23 +663,24 @@ public class TestListenHTTP {
mff.assertAttributeExists("http.headers.multipart.content-disposition");
}
- private byte[] generateRandomBinaryData(int i) {
+ private byte[] generateRandomBinaryData() {
byte[] bytes = new byte[100];
new Random().nextBytes(bytes);
return bytes;
}
- private File createTextFile(String prefix, String extension, String...lines) throws IOException {
- Path file = Files.createTempFile(prefix, extension);
- try (FileOutputStream fos = new FileOutputStream(file.toFile())) {
+ private File createTextFile(String...lines) throws IOException {
+ final File textFile = Files.createTempFile(TestListenHTTP.class.getSimpleName(), ".txt").toFile();
+ textFile.deleteOnExit();
+
+ try (FileOutputStream fos = new FileOutputStream(textFile)) {
IOUtils.writeLines(Arrays.asList(lines), System.lineSeparator(), fos, Charsets.UTF_8);
}
- return file.toFile();
+ return textFile;
}
- protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) {
- Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue));
- Assert.assertTrue(optional.isPresent());
- return optional.get();
+ protected MockFlowFile findFlowFile(final List<MockFlowFile> flowFiles, final String attributeValue) {
+ final Optional<MockFlowFile> foundFlowFile = flowFiles.stream().filter(flowFile -> flowFile.getAttribute(MULTIPART_ATTRIBUTE).equals(attributeValue)).findFirst();
+ return foundFlowFile.orElseThrow(() -> new NullPointerException(MULTIPART_ATTRIBUTE));
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
index 46da31b..05e01ee 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
@@ -24,27 +24,26 @@ import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ServerSocketFactory;
public class TCPTestServer implements Runnable {
private final InetAddress ipAddress;
- private int port;
private final String messageDelimiter;
+ private final ArrayBlockingQueue<List<Byte>> queue;
+ private final AtomicInteger totalNumConnections = new AtomicInteger();
+ private final boolean closeOnMessageReceived;
+
private volatile ServerSocket serverSocket;
- private final ArrayBlockingQueue<List<Byte>> recvQueue;
private volatile Socket connectionSocket;
- public final static String DEFAULT_MESSAGE_DELIMITER = "\n";
- private volatile int totalNumConnections = 0;
-
- public TCPTestServer(final InetAddress ipAddress, final ArrayBlockingQueue<List<Byte>> recvQueue) {
- this(ipAddress, recvQueue, DEFAULT_MESSAGE_DELIMITER);
- }
+ private int port;
- public TCPTestServer(final InetAddress ipAddress, final ArrayBlockingQueue<List<Byte>> recvQueue, final String messageDelimiter) {
+ public TCPTestServer(final InetAddress ipAddress, final ArrayBlockingQueue<List<Byte>> queue, final String messageDelimiter, final boolean closeOnMessageReceived) {
this.ipAddress = ipAddress;
- this.recvQueue = recvQueue;
+ this.queue = queue;
this.messageDelimiter = messageDelimiter;
+ this.closeOnMessageReceived = closeOnMessageReceived;
}
public synchronized void startServer(final ServerSocketFactory serverSocketFactory) throws Exception {
@@ -91,7 +90,10 @@ public class TCPTestServer implements Runnable {
}
private void storeReceivedMessage(final List<Byte> message) {
- recvQueue.add(message);
+ queue.add(message);
+ if (closeOnMessageReceived) {
+ shutdownConnection();
+ }
}
private boolean isServerRunning() {
@@ -102,18 +104,14 @@ public class TCPTestServer implements Runnable {
return connectionSocket != null && !connectionSocket.isClosed();
}
- public List<Byte> getReceivedMessage() {
- return recvQueue.poll();
- }
-
public int getTotalNumConnections() {
- return totalNumConnections;
+ return totalNumConnections.get();
}
protected boolean isDelimiterPresent(final List<Byte> message) {
if (messageDelimiter != null && message.size() >= messageDelimiter.length()) {
for (int i = 1; i <= messageDelimiter.length(); i++) {
- if (message.get(message.size() - i).byteValue() == messageDelimiter.charAt(messageDelimiter.length() - i)) {
+ if (message.get(message.size() - i) == messageDelimiter.charAt(messageDelimiter.length() - i)) {
if (i == messageDelimiter.length()) {
return true;
}
@@ -142,12 +140,12 @@ public class TCPTestServer implements Runnable {
try {
while (isServerRunning()) {
connectionSocket = serverSocket.accept();
- totalNumConnections++;
- InputStream in = connectionSocket.getInputStream();
+ totalNumConnections.incrementAndGet();
+ final InputStream inputStream = connectionSocket.getInputStream();
while (isConnected()) {
- final List<Byte> message = new ArrayList<Byte>();
+ final List<Byte> message = new ArrayList<>();
while (true) {
- final int c = in.read();
+ final int c = inputStream.read();
if (c < 0) {
if (!message.isEmpty()) {
storeReceivedMessage(message);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
deleted file mode 100644
index c20e9ca..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
+++ /dev/null
@@ -1,2187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard.util;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import javax.net.ssl.SSLContext;
-import javax.servlet.MultipartConfigElement;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.Part;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processors.standard.InvokeHTTP;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.remote.io.socket.NetworkUtils;
-import org.apache.nifi.security.util.ClientAuth;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.StringUtils;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.web.util.JettyServerUtils;
-import org.eclipse.jetty.http.MultiPartFormInputStream;
-import org.eclipse.jetty.security.ConstraintSecurityHandler;
-import org.eclipse.jetty.security.DefaultIdentityService;
-import org.eclipse.jetty.security.HashLoginService;
-import org.eclipse.jetty.security.ServerAuthException;
-import org.eclipse.jetty.security.authentication.DigestAuthenticator;
-import org.eclipse.jetty.server.Authentication;
-import org.eclipse.jetty.server.Handler;
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.util.MultiPartInputStreamParser;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.commons.codec.binary.Base64.encodeBase64;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public abstract class TestInvokeHttpCommon {
-
- protected static Server server;
-
- protected static String url;
-
- private static final long READ_TIMEOUT_SLEEP = 1000;
-
- protected TestRunner runner;
-
- protected static void configureServer(final SSLContext sslContext, final ClientAuth clientAuth) throws Exception {
- final int port = NetworkUtils.availablePort();
-
- final String protocol = sslContext == null ? "http" : "https";
- setUrl(protocol, port);
-
- final Server configuredServer = JettyServerUtils.createServer(port, sslContext, clientAuth);
- final ServerConnector connector = new ServerConnector(configuredServer);
- connector.setPort(port);
-
- JettyServerUtils.startServer(configuredServer);
- setServer(configuredServer);
- }
-
- private static void setUrl(final String scheme, final int port) {
- url = String.format("%s://localhost:%d", scheme, port);
- }
-
- private static void setServer(final Server configuredServer) {
- server = configuredServer;
- }
-
- protected static void addHandler(final Handler handler) {
- JettyServerUtils.addHandler(server, handler);
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- if (server != null) {
- server.stop();
- server.destroy();
- }
- }
-
- @After
- public void after() {
- JettyServerUtils.clearHandlers(server);
- runner.shutdown();
- }
-
- @Test
- public void testDateGeneration() throws Exception {
-
- final DateHandler dh = new DateHandler();
- addHandler(dh);
-
- runner.setProperty(InvokeHTTP.PROP_URL, url);
- createFlowFiles(runner);
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
-
- // extract the date string sent to the server
- // and store it as a java.util.Date
- final SimpleDateFormat sdf = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US);
- final Date date = sdf.parse(dh.dateString);
-
- // calculate the difference between the date string sent by the client and
- // the current system time -- these should be within a second or two
- // (just enough time to run the test).
- //
- // If the difference is more like in hours, it's likely that a timezone
- // conversion caused a problem.
- final long diff = Math.abs(System.currentTimeMillis() - date.getTime());
- final long threshold = 15000; // 15 seconds
- if (diff > threshold) {
- fail("Difference (" + diff + ") was greater than threshold (" + threshold + ")");
- }
- }
-
- @Test
- public void test200() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
-
- createFlowFiles(runner);
-
- // Verify only one FlowFile gets created/sent
- runner.run();
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request status.code and status.message
- // original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- // expected in response
- // status code, status message, all headers from server response --> ff attributes
- // server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
-
- final List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
- assertEquals(2, provEvents.size());
- boolean forkEvent = false;
- boolean fetchEvent = false;
- for (final ProvenanceEventRecord event : provEvents) {
- if (event.getEventType() == ProvenanceEventType.FORK) {
- forkEvent = true;
- } else if (event.getEventType() == ProvenanceEventType.FETCH) {
- fetchEvent = true;
- }
- }
-
- assertTrue(forkEvent);
- assertTrue(fetchEvent);
- }
-
- @Test
- public void testOutputResponseRegardless() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request status.code and status.message
- // original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- // expected in response
- // status code, status message, all headers from server response --> ff attributes
- // server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("NO".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- }
-
- @Test
- public void testOutputResponseRegardlessWithOutputInAttribute() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true");
- runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request status.code and status.message
- // original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
- bundle.assertAttributeEquals("outputBody", "NO");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- // expected in response
- // status code, status message, all headers from server response --> ff attributes
- // server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("NO".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- }
-
- @Test
- public void testOutputResponseSetMimeTypeToResponseContentType() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- String statusUrl = "/status/200";
- runner.setProperty(InvokeHTTP.PROP_URL, url + statusUrl);
- runner.setProperty(InvokeHTTP.PROP_METHOD, "GET");
- runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true");
- runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY,0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request status.code and status.message
- // original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("outputBody", statusUrl);
- bundle.assertAttributeEquals("Foo", "Bar");
-
- // expected in response
- // status code, status message, all headers from server response --> ff attributes
- // server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals(statusUrl.getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- bundle1.assertAttributeEquals("mime.type", "text/plain;charset=iso-8859-1");
- }
-
- @Test
- public void testOutputResponseRegardlessWithOutputInAttributeLarge() throws Exception {
- addHandler(new GetLargeHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true");
- runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody");
- runner.setProperty(InvokeHTTP.PROP_PUT_ATTRIBUTE_MAX_LENGTH,"11");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request status.code and status.message
- // original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
- bundle.assertAttributeEquals("outputBody", "Lorem ipsum");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- // expected in response
- // status code, status message, all headers from server response --> ff attributes
- // server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
- + "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor "
- + "in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, "
- + "sunt in culpa qui officia deserunt mollit anim id est laborum.");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- }
-
-
- @Test
- public void testMultipleSameHeaders() throws Exception {
- addHandler(new GetMultipleHeaderHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request status.code and status.message
- // original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- // expected in response
- // status code, status message, all headers from server response --> ff attributes
- // server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("double", "1, 2");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- }
-
- @Test
- public void testPutResponseHeadersInRequest() throws Exception {
- addHandler(new GetMultipleHeaderHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_ADD_HEADERS_TO_REQUEST, "true");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request status.code and status.message
- // original flow file (+all attributes from response)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
- bundle.assertAttributeEquals("double", "1, 2");
- bundle.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
-
- // expected in response
- // status code, status message, all headers from server response --> ff attributes
- // server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("double", "1, 2");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- }
-
- @Test
- public void testToRequestAttribute() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request status.code, status.message and body of response in attribute
- // original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals("outputBody", "/status/200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
- }
-
- @Test
- public void testNoInput() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_METHOD,"GET");
- runner.setIncomingConnection(false);
- runner.setNonLoopConnection(false);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in response
- // status code, status message, all headers from server response --> ff attributes
- // server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- }
-
- @Test
- public void testNoInputWithAttributes() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_METHOD, "GET");
- runner.setProperty(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, "myAttribute");
- runner.setIncomingConnection(false);
- runner.setNonLoopConnection(false);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in response
- // status code, status message, all headers from server response --> ff attributes
- // server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- }
-
- @Test
- public void testNoInputFail() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setIncomingConnection(false);
- runner.setNonLoopConnection(false);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- runner.setProperty(InvokeHTTP.PROP_METHOD,"OPTION");
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
- }
-
- @Test
- public void testNoInputSendToAttribute() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE, "outputBody");
- runner.setIncomingConnection(false);
- runner.setNonLoopConnection(false);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request
- // status code, status message, no ff content
- // server response message body into attribute of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle1.assertContentEquals("".getBytes("UTF-8"));
- bundle1.assertAttributeEquals("outputBody", "/status/200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- }
-
- @Test
- public void test200Auth() throws Exception {
- addHandler(new BasicAuthHandler());
-
- final String username = "basic_user";
- final String password = "basic_password";
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, username);
- runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, password);
- final byte[] creds = String.format("%s:%s", username, password).getBytes(StandardCharsets.UTF_8);
- final String expAuth = String.format("Basic %s", new String(encodeBase64(creds)));
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request status.code and status.message
- // original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- // expected in response
- // status code, status message, all headers from server response --> ff attributes
- // server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- final String bundle1Content = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
- assertTrue(bundle1Content.startsWith(expAuth)); // use startsWith instead of equals so we can ignore line endings
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
-
- final List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
- assertEquals(2, provEvents.size());
- boolean forkEvent = false;
- boolean fetchEvent = false;
- for (final ProvenanceEventRecord event : provEvents) {
- if (event.getEventType() == ProvenanceEventType.FORK) {
- forkEvent = true;
- } else if (event.getEventType() == ProvenanceEventType.FETCH) {
- fetchEvent = true;
- }
- }
-
- assertTrue(forkEvent);
- assertTrue(fetchEvent);
- }
-
- @Test
- public void test401NotAuth() throws Exception {
- addHandler(new BasicAuthHandler());
-
- final String username = "basic_user";
- final String password = "basic_password";
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/401");
- runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, username);
- runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, password);
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in request status.code and status.message
- // original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "401");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Unauthorized");
- bundle.assertAttributeEquals("Foo", "Bar");
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
-
- final String response = bundle.getAttribute(InvokeHTTP.RESPONSE_BODY);
- assertEquals(response, "Get off my lawn!"+System.lineSeparator());
- }
-
- @Test
- public void test200DigestAuth() throws Exception {
- addHandler(new DigestAuthHandler());
- final String username = "basic_user";
- final String password = "basic_password";
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, username);
- runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, password);
- runner.setProperty(InvokeHTTP.PROP_DIGEST_AUTH,"true");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- //expected in request status.code and status.message
- //original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
-
- //expected in response
- //status code, status message, all headers from server response --> ff attributes
- //server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals(("DIGEST"+System.lineSeparator()).getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- }
-
- @Test
- public void test401DigestNotAuth() throws Exception {
- addHandler(new DigestAuthHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_DIGEST_AUTH,"false");
- runner.setProperty(InvokeHTTP.PROP_PUT_ATTRIBUTE_MAX_LENGTH,"512");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- //expected in request status.code and status.message
- //original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "401");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Unauthorized");
- bundle.assertAttributeEquals("Foo", "Bar");
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- }
-
- @Test
- public void test500() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/500");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(1);
-
- // expected in response
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RETRY).get(0);
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "500");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Server Error");
- bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/500");
-
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- bundle.assertAttributeEquals("Foo", "Bar");
-
- }
-
- @Test
- public void test300() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/302");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
- // expected in response
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
-
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "302");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Found");
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- bundle.assertAttributeEquals("Foo", "Bar");
-
- }
-
- @Test
- public void test304() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/304");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
- // expected in response
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
-
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "304");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Modified");
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- bundle.assertAttributeEquals("Foo", "Bar");
-
- }
-
- @Test
- public void test400() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/400");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
- // expected in response
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
-
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "400");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Bad Request");
- bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/400");
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- bundle.assertAttributeEquals("Foo", "Bar");
-
- }
-
- @Test
- public void test400WithPenalizeNoRetry() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/400");
- runner.setProperty(InvokeHTTP.PROP_PENALIZE_NO_RETRY, "true");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(1);
- // expected in response
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
-
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "400");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Bad Request");
- bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/400");
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- bundle.assertAttributeEquals("Foo", "Bar");
- }
-
- @Test
- public void test412() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/412");
- runner.setProperty(InvokeHTTP.PROP_METHOD, "GET");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- // expected in response
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
-
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "412");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Precondition Failed");
- bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/412");
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- bundle.assertAttributeEquals("Foo", "Bar");
-
- }
-
- @Test
- public void testHead() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "HEAD");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
- final String expected1 = "";
- Assert.assertEquals(expected1, actual1);
- }
-
- @Test
- public void testPost() throws Exception {
- addHandler(new MutativeMethodHandler(MutativeMethod.POST));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeNotExists("Content-Type");
-
- final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
- final String expected1 = "";
- Assert.assertEquals(expected1, actual1);
- }
-
- @Test
- public void testPostWithMimeType() throws Exception {
- final String suppliedMimeType = "text/plain";
- addHandler(new MutativeMethodHandler(MutativeMethod.POST, suppliedMimeType));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
-
- final Map<String, String> attrs = new HashMap<>();
-
- attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPostWithEmptyELExpression() throws Exception {
- addHandler(new MutativeMethodHandler(MutativeMethod.POST, InvokeHTTP.DEFAULT_CONTENT_TYPE));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), "");
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPostWithContentTypeProperty() throws Exception {
- final String suppliedMimeType = "text/plain";
- addHandler(new MutativeMethodHandler(MutativeMethod.POST, suppliedMimeType));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
- runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPostWithEmptyBodySet() throws Exception {
- final String suppliedMimeType = "";
- addHandler(new MutativeMethodHandler(MutativeMethod.POST, suppliedMimeType));
-
- runner.setNonLoopConnection(false);
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
- runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
- runner.setProperty(InvokeHTTP.PROP_SEND_BODY, "false");
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPostWithFormDataWithFileName() throws Exception {
- final String suppliedMimeType = "text/plain";
- MultipartFormHandler handler = new MultipartFormHandler();
- handler.addExpectedPart("name1", "form data 1");
- handler.addExpectedPart("name2", "form data 2");
- handler.addExpectedPart("content", "Hello");
- handler.addFileName("content", "file_name");
- addHandler(handler);
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
- runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
-
- // dynamic form properties
- PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
- .dynamic(true)
- .name(InvokeHTTP.FORM_BASE + ":name1")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- runner.setProperty(dynamicProp1, "form data 1");
-
- PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
- .dynamic(true)
- .name(InvokeHTTP.FORM_BASE + ":name2")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- runner.setProperty(dynamicProp2, "form data 2");
-
- runner.setProperty(InvokeHTTP.PROP_FORM_BODY_FORM_NAME ,"content");
- runner.setProperty(InvokeHTTP.PROP_SET_FORM_FILE_NAME, "true");
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- attrs.put(CoreAttributes.FILENAME.key(), "file_name");
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPostFormContentOnly() throws Exception {
- final String suppliedMimeType = "text/plain";
- MultipartFormHandler handler = new MultipartFormHandler();
- handler.addExpectedPart("content", "Hello");
- handler.addFileName("content", "file_name");
- addHandler(handler);
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
- runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
-
- runner.setProperty(InvokeHTTP.PROP_FORM_BODY_FORM_NAME ,"content");
- runner.setProperty(InvokeHTTP.PROP_SET_FORM_FILE_NAME, "true");
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- attrs.put(CoreAttributes.FILENAME.key(), "file_name");
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPostWithFormDataNoFileName() throws Exception {
- final String suppliedMimeType = "text/plain";
- MultipartFormHandler handler = new MultipartFormHandler();
- handler.addExpectedPart("name1", "form data 1");
- handler.addExpectedPart("name2", "form data 2");
- handler.addExpectedPart("content", "Hello");
- addHandler(handler);
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
- runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
-
- // dynamic form properties
- PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
- .dynamic(true)
- .name(InvokeHTTP.FORM_BASE + ":name1")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- runner.setProperty(dynamicProp1, "form data 1");
-
- PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
- .dynamic(true)
- .name(InvokeHTTP.FORM_BASE + ":name2")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- runner.setProperty(dynamicProp2, "form data 2");
-
- runner.setProperty(InvokeHTTP.PROP_FORM_BODY_FORM_NAME ,"content");
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPostWithFormDataNoFile() throws Exception {
- final String suppliedMimeType = "text/plain";
- MultipartFormHandler handler = new MultipartFormHandler();
- handler.addExpectedPart("name1", "form data 1");
- handler.addExpectedPart("name2", "form data 2");
- addHandler(handler);
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
- runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
- runner.setProperty(InvokeHTTP.PROP_SEND_BODY, "false");
-
- // dynamic form properties
- PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
- .dynamic(true)
- .name(InvokeHTTP.FORM_BASE + ":name1")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- runner.setProperty(dynamicProp1, "form data 1");
-
- PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
- .dynamic(true)
- .name(InvokeHTTP.FORM_BASE + ":name2")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- runner.setProperty(dynamicProp2, "form data 2");
-
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPostNoSendBodyWithContentFails() throws Exception {
- final String suppliedMimeType = "text/plain";
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
- runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
- runner.setProperty(InvokeHTTP.PROP_SEND_BODY, "false");
-
- runner.setProperty(InvokeHTTP.PROP_FORM_BODY_FORM_NAME ,"content");
- runner.assertNotValid();
- }
-
- @Test
- public void testPostNoFormContentWithFileNameFails() throws Exception {
- final String suppliedMimeType = "text/plain";
- runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
- runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
-
- // dynamic form properties
- PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
- .dynamic(true)
- .name(InvokeHTTP.FORM_BASE + ":name1")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- runner.setProperty(dynamicProp1, "form data 1");
-
- PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
- .dynamic(true)
- .name(InvokeHTTP.FORM_BASE + ":name2")
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- runner.setProperty(dynamicProp2, "form data 2");
-
- runner.setProperty(InvokeHTTP.PROP_SET_FORM_FILE_NAME, "true");
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- attrs.put(CoreAttributes.FILENAME.key(), "file_name");
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.assertNotValid();
- }
-
- @Test
- public void testPutWithMimeType() throws Exception {
- final String suppliedMimeType = "text/plain";
- addHandler(new MutativeMethodHandler(MutativeMethod.PUT, suppliedMimeType));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
-
- final Map<String, String> attrs = new HashMap<>();
-
- attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPutWithEmptyELExpression() throws Exception {
- addHandler(new MutativeMethodHandler(MutativeMethod.PUT, InvokeHTTP.DEFAULT_CONTENT_TYPE));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), "");
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPutWithContentTypeProperty() throws Exception {
- final String suppliedMimeType = "text/plain";
- addHandler(new MutativeMethodHandler(MutativeMethod.PUT, suppliedMimeType));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
- runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPut() throws Exception {
- addHandler(new MutativeMethodHandler(MutativeMethod.PUT));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeNotExists("Content-Type");
-
- final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
- final String expected1 = "";
- Assert.assertEquals(expected1, actual1);
- }
-
- @Test
- public void testPatch() throws Exception {
- addHandler(new MutativeMethodHandler(MutativeMethod.PATCH));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "PATCH");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/patch");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeNotExists("Content-Type");
-
- final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
- final String expected1 = "";
- Assert.assertEquals(expected1, actual1);
- }
-
- @Test
- public void testPatchWithMimeType() throws Exception {
- final String suppliedMimeType = "text/plain";
- addHandler(new MutativeMethodHandler(MutativeMethod.PATCH, suppliedMimeType));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "PATCH");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/patch");
-
- final Map<String, String> attrs = new HashMap<>();
-
- attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPatchWithEmptyELExpression() throws Exception {
- addHandler(new MutativeMethodHandler(MutativeMethod.PATCH, InvokeHTTP.DEFAULT_CONTENT_TYPE));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "PATCH");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/patch");
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), "");
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testPatchWithContentTypeProperty() throws Exception {
- final String suppliedMimeType = "text/plain";
- addHandler(new MutativeMethodHandler(MutativeMethod.PATCH, suppliedMimeType));
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "PATCH");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/patch");
- runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
-
- final Map<String, String> attrs = new HashMap<>();
- attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
- runner.enqueue("Hello".getBytes(), attrs);
-
- runner.run(1);
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- }
-
- @Test
- public void testDelete() throws Exception {
- addHandler(new DeleteHandler());
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "DELETE");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
- final String expected1 = "";
- Assert.assertEquals(expected1, actual1);
- }
-
- @Test
- public void testOptions() throws Exception {
- addHandler(new OptionsHandler());
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "OPTIONS");
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- }
-
- @Test
- public void testSendAttributes() throws Exception {
- addHandler(new AttributesSentHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, "F.*");
- runner.setProperty("dynamicHeader","yes!");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- //expected in request status.code and status.message
- //original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle.assertAttributeEquals("Foo", "Bar");
-
- //expected in response
- //status code, status message, all headers from server response --> ff attributes
- //server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("Bar".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("dynamicHeader","yes!");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- }
-
- @Test
- public void testReadTimeout() throws Exception {
- addHandler(new ReadTimeoutHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_READ_TIMEOUT, String.format("%d ms", READ_TIMEOUT_SLEEP / 2));
-
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
- runner.assertPenalizeCount(1);
-
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
-
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- bundle.assertAttributeEquals("Foo", "Bar");
- }
-
- @Test
- public void testConnectFailBadPort() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- // this is the bad urls
- final String badurlport = "http://localhost:" + 445;
-
- runner.setProperty(InvokeHTTP.PROP_URL, badurlport + "/doesnotExist");
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
- runner.assertPenalizeCount(1);
-
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
-
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- bundle.assertAttributeEquals("Foo", "Bar");
- }
-
- @Test
- public void testConnectFailBadHost() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- final String badurlhost = "http://localhOOst:" + 445;
-
- runner.setProperty(InvokeHTTP.PROP_URL, badurlhost + "/doesnotExist");
- createFlowFiles(runner);
-
- runner.run();
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
- runner.assertPenalizeCount(1);
-
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
-
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- bundle.assertAttributeEquals("Foo", "Bar");
- }
-
- @Test
- public void testArbitraryRequest() throws Exception {
- addHandler(new FetchHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_METHOD,"FETCH");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
-
- //expected in request status.code and status.message
- //original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- bundle.assertAttributeEquals("Foo", "Bar");
-
- //expected in response
- //status code, status message, all headers from server response --> ff attributes
- //server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
- bundle1.assertAttributeEquals("Content-Type", "text/plain;charset=iso-8859-1");
- final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
- final String expected1 = "/status/200";
- Assert.assertEquals(expected1, actual1);
- }
-
- @Test
- public void testChunkedRequest() throws Exception {
- MutativeMethodHandler mutativeMethodHandler = new MutativeMethodHandler(MutativeMethod.POST);
- mutativeMethodHandler.setHeaderToTrack("Transfer-encoding");
- addHandler(mutativeMethodHandler);
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.PROP_METHOD,"POST");
- runner.setProperty(InvokeHTTP.PROP_USE_CHUNKED_ENCODING,"true");
-
- createFlowFiles(runner);
-
- runner.run();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
-
- //expected in request status.code and status.message
- //original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- final String expected = "Hello";
- Assert.assertEquals(expected, actual);
- bundle.assertAttributeEquals("Foo", "Bar");
-
- //expected in response
- //status code, status message, all headers from server response --> ff attributes
- //server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
- bundle1.assertAttributeEquals("Foo", "Bar");
-
- String header = mutativeMethodHandler.getTrackedHeaderValue();
- Assert.assertEquals("chunked",header);
- }
-
- @Test
- public void testTrustedHostname() throws Exception {
- addHandler(new GetOrHeadHandler());
-
- runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
- runner.setProperty("Trusted Hostname", "https://example.com/");
- runner.assertValid();
-
- runner.setProperty(InvokeHTTP.PROP_METHOD, "GET");
- runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true");
- runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody");
- runner.assertValid();
-
- createFlowFiles(runner);
- runner.run();
-
- runner.assertValid();
-
- runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
- runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY,0);
- runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- runner.assertPenalizeCount(0);
- }
-
-
- public static void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException {
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
- attributes.put("Foo", "Bar");
- testRunner.enqueue("Hello".getBytes("UTF-8"), attributes);
-
- }
-
-
- protected static class DateHandler extends AbstractHandler {
-
- private String dateString;
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
- baseRequest.setHandled(true);
-
- dateString = request.getHeader("Date");
-
- response.setStatus(200);
- response.setContentType("text/plain");
- response.getWriter().println("Way to go!");
- }
- }
-
- private enum MutativeMethod { POST, PUT, PATCH }
-
-
- public static class MutativeMethodHandler extends AbstractHandler {
- private final MutativeMethod method;
- private final String expectedContentType;
- private String headerToTrack;
- private String trackedHeaderValue;
-
- public MutativeMethodHandler(final MutativeMethod method) {
- this(method, "application/plain-text");
- }
-
- public MutativeMethodHandler(final MutativeMethod method, final String expectedContentType) {
- this.method = method;
- this.expectedContentType = expectedContentType;
- }
- private void setHeaderToTrack(String headerToTrack){
- this.headerToTrack = headerToTrack;
- }
-
- public String getTrackedHeaderValue(){
- return trackedHeaderValue;
- }
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
-
- baseRequest.setHandled(true);
-
- if(method.name().equals(request.getMethod())) {
- if(this.expectedContentType.isEmpty()) {
- Assert.assertNull(request.getHeader("Content-Type"));
- } else {
- assertEquals(this.expectedContentType,request.getHeader("Content-Type"));
- }
-
- final String body = request.getReader().readLine();
- this.trackedHeaderValue = baseRequest.getHttpFields().get(headerToTrack);
-
- if(this.expectedContentType.isEmpty()) {
- Assert.assertNull(body);
- } else {
- assertEquals("Hello", body);
- }
- } else {
- response.setStatus(404);
- response.setContentType("text/plain");
- response.setContentLength(0);
- }
-
- }
-
- }
-
- public static class MultipartFormHandler extends AbstractHandler {
- private static final String MULTIPART_FORMDATA_TYPE = "multipart/form-data";
-
- private String headerToTrack;
- private String trackedHeaderValue;
- private final HashMap<String,String> expectedParts = new HashMap<>();
- private String fileNamePartName = null;
- private String fileName = null;
-
- public MultipartFormHandler() {
- }
-
- public void addExpectedPart(String name, String value) {
- expectedParts.put(name,value);
- }
-
- public void addFileName(String partName, String fileName) {
- fileNamePartName = partName;
- this.fileName = fileName;
- }
-
- private void setHeaderToTrack(String headerToTrack) {
- this.headerToTrack = headerToTrack;
- }
-
- public String getTrackedHeaderValue() {
- return trackedHeaderValue;
- }
-
- private boolean isMultipartRequest(HttpServletRequest request) {
- return request.getContentType() != null
- && request.getContentType().startsWith(MULTIPART_FORMDATA_TYPE);
- }
- private static final MultipartConfigElement MULTI_PART_CONFIG = new MultipartConfigElement(
- System.getProperty("java.io.tmpdir"));
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
-
- baseRequest.setHandled(true);
-
- assertTrue(request.getHeader("Content-Type").startsWith("multipart/form-data"));
-
- this.trackedHeaderValue = baseRequest.getHttpFields().get(headerToTrack);
- boolean multipartRequest = isMultipartRequest(request);
- if (multipartRequest) {
- baseRequest.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, MULTI_PART_CONFIG);
- if (expectedParts.size() > 0) {
- assertEquals(expectedParts.size(), request.getParts().size());
- }
- for (Part part : request.getParts()) {
- String name;
- String val;
- String partFileName;
- if (part instanceof MultiPartInputStreamParser.MultiPart) {
- MultiPartInputStreamParser.MultiPart multiPart = ((MultiPartInputStreamParser.MultiPart) part);
- val = new String(multiPart.getBytes());
- name = part.getName();
- partFileName = part.getSubmittedFileName();
- } else if (part instanceof MultiPartFormInputStream.MultiPart) {
- MultiPartFormInputStream.MultiPart multiPart = ((MultiPartFormInputStream.MultiPart) part);
- val = new String(multiPart.getBytes());
- name = part.getName();
- partFileName = part.getSubmittedFileName();
- } else {
- name = "NO";
- val = "NO";
- partFileName = "NO";
- }
-
- if (expectedParts.size() > 0) {
- assertNotNull(expectedParts.get(name));
- assertEquals(expectedParts.get(name), val);
- }
- if (!StringUtils.isBlank(fileNamePartName)) {
- if (name.equals(fileNamePartName)) {
- assertEquals(fileName, partFileName);
- }
- }
- }
- }
- }
-
- }
-
- public static class GetOrHeadHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
- baseRequest.setHandled(true);
-
- final int status = Integer.valueOf(target.substring("/status".length() + 1));
- response.setStatus(status);
-
- if ("GET".equalsIgnoreCase(request.getMethod())) {
- if (status == 304){
- // Status code 304 ("Not Modified") must not contain a message body
- return;
- }
-
- response.setContentType("text/plain");
- response.setContentLength(target.length());
- response.setHeader("Cache-Control", "public,max-age=1");
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(target);
- writer.flush();
- }
- } else if(!"HEAD".equalsIgnoreCase(request.getMethod())) {
- response.setStatus(404);
- response.setContentType("text/plain");
- String body = "NO";
- response.setContentLength(body.length());
- response.setContentType("text/plain");
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(body);
- writer.flush();
- }
- }
-
- }
- }
-
- public static class GetLargeHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
- baseRequest.setHandled(true);
-
- final int status = Integer.valueOf(target.substring("/status".length() + 1));
- response.setStatus(status);
-
- response.setContentType("text/plain");
- response.setContentLength(target.length());
-
- if ("GET".equalsIgnoreCase(request.getMethod())) {
- try (PrintWriter writer = response.getWriter()) {
- writer.print(target);
- writer.flush();
- }
- } else {
- response.setStatus(404);
- response.setContentType("text/plain");
-
- //Lorem Ipsum
- String body = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
- + "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor "
- + "in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, "
- + "sunt in culpa qui officia deserunt mollit anim id est laborum.";
-
- response.setContentLength(body.length());
- response.setContentType("text/plain");
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(body);
- writer.flush();
- }
- }
-
- }
- }
-
- public static class GetMultipleHeaderHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
- baseRequest.setHandled(true);
-
- final int status = Integer.valueOf(target.substring("/status".length() + 1));
- response.setStatus(status);
-
- response.setContentType("text/plain");
- response.setContentLength(target.length());
-
- if ("GET".equalsIgnoreCase(request.getMethod())) {
- response.addHeader("double", "1");
- response.addHeader("double", "2");
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(target);
- writer.flush();
- }
- } else {
- response.setStatus(404);
- response.setContentType("text/plain");
- response.setContentLength(0);
- }
-
- }
- }
-
- public static class DeleteHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
- baseRequest.setHandled(true);
-
- if ("DELETE".equalsIgnoreCase(request.getMethod())) {
- final int status = Integer.valueOf(target.substring("/status".length() + 1));
- response.setStatus(status);
- response.setContentLength(0);
- } else {
- response.setStatus(404);
- response.setContentType("text/plain");
- response.setContentLength(0);
- }
- }
- }
-
- public static class OptionsHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
- baseRequest.setHandled(true);
-
- if ("OPTIONS".equalsIgnoreCase(request.getMethod())) {
- final int status = Integer.valueOf(target.substring("/status".length() + 1));
- response.setStatus(status);
- response.setContentLength(target.length());
- response.setContentType("text/plain");
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(target);
- writer.flush();
- }
- } else {
- response.setStatus(404);
- response.setContentType("text/plain");
- response.setContentLength(target.length());
- }
- }
- }
-
- public static class AttributesSentHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
- baseRequest.setHandled(true);
-
- if ("Get".equalsIgnoreCase(request.getMethod())) {
- String headerValue = request.getHeader("Foo");
- response.setHeader("dynamicHeader",request.getHeader("dynamicHeader"));
- final int status = Integer.valueOf(target.substring("/status".length() + 1));
- response.setStatus(status);
- response.setContentLength(headerValue.length());
- response.setContentType("text/plain");
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(headerValue);
- writer.flush();
- }
- } else {
- response.setStatus(404);
- response.setContentType("text/plain");
- response.setContentLength(0);
- }
- }
- }
-
- public static class ReadTimeoutHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
- baseRequest.setHandled(true);
-
- if ("Get".equalsIgnoreCase(request.getMethod())) {
- try {
- Thread.sleep(READ_TIMEOUT_SLEEP);
- } catch (InterruptedException e) {
- return;
- }
- String headerValue = request.getHeader("Foo");
- headerValue = headerValue == null ? "" : headerValue;
- final int status = Integer.valueOf(target.substring("/status".length() + 1));
- response.setStatus(status);
- response.setContentLength(headerValue.length());
- response.setContentType("text/plain");
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(headerValue);
- writer.flush();
- }
- } else {
- response.setStatus(404);
- response.setContentType("text/plain");
- response.setContentLength(0);
- }
- }
- }
-
- public static class BasicAuthHandler extends AbstractHandler {
-
- private String authString;
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
- baseRequest.setHandled(true);
-
- authString = request.getHeader("Authorization");
-
- if (authString == null) {
- response.setStatus(401);
- response.setHeader("WWW-Authenticate", "Basic realm=\"Jetty\"");
- response.setHeader("response.phrase", "Unauthorized");
- response.setContentType("text/plain");
- response.getWriter().println("Get off my lawn!");
- return;
- }
-
- final int status = Integer.valueOf(target.substring("/status".length() + 1));
-
- if (status == 200) {
- response.setStatus(status);
- response.setContentType("text/plain");
- response.getWriter().println(authString);
- } else {
- response.setStatus(status);
- response.setContentType("text/plain");
- response.getWriter().println("Get off my lawn!");
- }
- }
- }
-
- public static class DigestAuthHandler extends AbstractHandler {
-
- private DigestAuthenticator digestAuthenticator;
-
- private DigestAuthHandler() throws Exception {
- digestAuthenticator = new DigestAuthenticator();
- ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler();
-
- final HashLoginService hashLoginService = new HashLoginService("realm", "src/test/resources/TestInvokeHttp/realm.properties");
- hashLoginService.start();
-
- securityHandler.setLoginService(hashLoginService);
- securityHandler.setIdentityService(new DefaultIdentityService());
- digestAuthenticator.setConfiguration(securityHandler);
- }
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse
- response)throws IOException, ServletException {
- baseRequest.setHandled(true);
-
- try {
- Authentication authentication = digestAuthenticator.validateRequest(request, response, true);
-
- if (authentication instanceof Authentication.User) {
- response.setContentType("text/plain");
- Authentication.User user = (Authentication.User) authentication;
- response.getWriter().println(user.getAuthMethod());
- } else if (authentication instanceof Authentication.ResponseSent) {
- Authentication.ResponseSent responseSent = (Authentication.ResponseSent) authentication;
- }
- } catch (ServerAuthException e) {
- e.printStackTrace();
- }
- }
- }
-
- public static class FetchHandler extends AbstractHandler {
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
- baseRequest.setHandled(true);
-
-
- if ("Fetch".equalsIgnoreCase(request.getMethod())) {
- final int status = Integer.valueOf(target.substring("/status".length() + 1));
- response.setStatus(status);
- response.setContentType("text/plain");
- response.setContentLength(target.length());
-
- try (PrintWriter writer = response.getWriter()) {
- writer.print(target);
- writer.flush();
- }
- } else {
-
- response.setStatus(404);
- response.setContentType("text/plain");
- response.setContentLength(target.length());
- }
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java
index 0e0acb6..3f94087 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java
@@ -65,7 +65,7 @@ public abstract class TestPutTCPCommon {
private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
private TCPTestServer server;
- private int tcp_server_port;
+ private int port;
private ArrayBlockingQueue<List<Byte>> recvQueue;
public ServerSocketFactory serverSocketFactory;
@@ -82,20 +82,24 @@ public abstract class TestPutTCPCommon {
@Before
public void setup() throws Exception {
- recvQueue = new ArrayBlockingQueue<List<Byte>>(BUFFER_SIZE);
+ recvQueue = new ArrayBlockingQueue<>(BUFFER_SIZE);
runner = TestRunners.newTestRunner(PutTCP.class);
runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
}
- private synchronized TCPTestServer createTestServer(final String address, final ArrayBlockingQueue<List<Byte>> recvQueue, final String delimiter) throws Exception {
- TCPTestServer server = new TCPTestServer(InetAddress.getByName(address), recvQueue, delimiter);
+ private TCPTestServer createTestServer(final ArrayBlockingQueue<List<Byte>> queue, final String delimiter, final boolean closeOnMessageReceived) throws Exception {
+ TCPTestServer server = new TCPTestServer(InetAddress.getByName(TCP_SERVER_ADDRESS), queue, delimiter, closeOnMessageReceived);
server.startServer(serverSocketFactory);
- tcp_server_port = server.getPort();
+ port = server.getPort();
return server;
}
+ private TCPTestServer createTestServer(final ArrayBlockingQueue<List<Byte>> queue, final String delimiter) throws Exception {
+ return createTestServer(queue, delimiter, false);
+ }
+
@After
- public void cleanup() throws Exception {
+ public void cleanup() {
runner.shutdown();
removeTestServer(server);
}
@@ -103,14 +107,13 @@ public abstract class TestPutTCPCommon {
private void removeTestServer(TCPTestServer server) {
if (server != null) {
server.shutdown();
- server = null;
}
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testValidFiles() throws Exception {
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
@@ -119,8 +122,8 @@ public abstract class TestPutTCPCommon {
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testValidFilesEL() throws Exception {
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS_EL, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS_EL, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
@@ -129,8 +132,8 @@ public abstract class TestPutTCPCommon {
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testPruneSenders() throws Exception {
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(VALID_FILES.length, 0);
@@ -149,18 +152,18 @@ public abstract class TestPutTCPCommon {
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testMultiCharDelimiter() throws Exception {
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
- configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false, true);
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
+ configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testConnectionPerFlowFile() throws Exception {
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, true, true);
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER, true);
+ configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, true, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
@@ -169,8 +172,8 @@ public abstract class TestPutTCPCommon {
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testConnectionFailure() throws Exception {
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
@@ -182,8 +185,8 @@ public abstract class TestPutTCPCommon {
checkNoDataReceived(recvQueue);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
@@ -192,8 +195,8 @@ public abstract class TestPutTCPCommon {
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testEmptyFile() throws Exception {
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(EMPTY_FILE);
Thread.sleep(10);
checkRelationships(EMPTY_FILE.length, 0);
@@ -203,9 +206,9 @@ public abstract class TestPutTCPCommon {
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testlargeValidFile() throws Exception {
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, true, true);
+ public void testLargeValidFile() throws Exception {
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, true, true);
final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
sendTestData(testData);
checkReceivedAllData(recvQueue, testData);
@@ -216,8 +219,8 @@ public abstract class TestPutTCPCommon {
@Ignore("This test is failing intermittently as documented in NIFI-4288")
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testInvalidIPAddress() throws Exception {
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(INVALID_IP_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(INVALID_IP_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(0, VALID_FILES.length);
@@ -229,8 +232,8 @@ public abstract class TestPutTCPCommon {
@Ignore("This test is failing intermittently as documented in NIFI-4288")
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testUnknownHostname() throws Exception {
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
- configureProperties(UNKNOWN_HOST, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(UNKNOWN_HOST, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(0, VALID_FILES.length);
@@ -249,10 +252,10 @@ public abstract class TestPutTCPCommon {
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testLoadTest() throws Exception {
- server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
+ server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
Thread.sleep(1000);
final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
- configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
+ configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
checkReceivedAllData(recvQueue, testData, LOAD_TEST_ITERATIONS);
checkInputQueueIsEmpty();
@@ -276,7 +279,7 @@ public abstract class TestPutTCPCommon {
for (String item : testData) {
runner.enqueue(item.getBytes());
}
- runner.run(testData.length, false, i == 0 ? true : false);
+ runner.run(testData.length, false, i == 0);
}
}
@@ -292,7 +295,10 @@ public abstract class TestPutTCPCommon {
private void checkEmptyMessageReceived(final ArrayBlockingQueue<List<Byte>> recvQueue) throws Exception {
Thread.sleep(DATA_WAIT_PERIOD);
- assertEquals(0, recvQueue.poll().size());
+ final List<Byte> message = recvQueue.poll();
+
+ assertNotNull(message);
+ assertEquals(0, message.size());
}
private void checkInputQueueIsEmpty() {