You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by de...@apache.org on 2018/08/24 14:07:50 UTC

nifi git commit: NIFI-5275 PostHTTP SocketConfig setup, fixed connection pool when using HTTPS, setup idle connection checker, setup request retry handler, improved some exception handling and logging, and NIFI-1336 fix as well This closes #2796. Signed-

Repository: nifi
Updated Branches:
  refs/heads/master cfc858c90 -> 8c0705cb6


NIFI-5275 PostHTTP SocketConfig setup, fixed connection pool when using HTTPS, setup idle connection checker, setup request retry handler, improved some exception handling and logging, and NIFI-1336 fix as well
This closes #2796.
Signed-off-by: Brandon Devries <de...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8c0705cb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8c0705cb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8c0705cb

Branch: refs/heads/master
Commit: 8c0705cb6bb0a937f992adc7a5078fa4483a54d9
Parents: cfc858c
Author: Mike Moser <mo...@apache.org>
Authored: Thu Jun 14 21:15:39 2018 +0000
Committer: Brandon Devries <de...@apache.org>
Committed: Fri Aug 24 10:07:11 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/PostHTTP.java      | 456 ++++++++++---------
 1 file changed, 247 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8c0705cb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
index 52c9fdf..ee28c0c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
@@ -26,7 +26,10 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.net.MalformedURLException;
+import java.net.UnknownHostException;
+import java.security.Principal;
 import java.security.KeyManagementException;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
@@ -53,31 +56,31 @@ import java.util.regex.Pattern;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
+import javax.security.auth.x500.X500Principal;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.Response.Status;
-import org.apache.commons.io.IOUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpException;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpResponseInterceptor;
+import org.apache.http.NoHttpResponseException;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.HttpClient;
+import org.apache.http.client.HttpRequestRetryHandler;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpHead;
 import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.config.Registry;
 import org.apache.http.config.RegistryBuilder;
-import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.config.SocketConfig;
 import org.apache.http.conn.ManagedHttpClientConnection;
 import org.apache.http.conn.socket.ConnectionSocketFactory;
 import org.apache.http.conn.socket.PlainConnectionSocketFactory;
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.conn.ssl.SSLContextBuilder;
-import org.apache.http.conn.ssl.SSLContexts;
 import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
 import org.apache.http.entity.ContentProducer;
 import org.apache.http.entity.EntityTemplate;
@@ -87,6 +90,8 @@ import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.protocol.HttpContext;
 import org.apache.http.protocol.HttpCoreContext;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
 import org.apache.http.util.EntityUtils;
 import org.apache.http.util.VersionInfo;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -133,7 +138,8 @@ import org.apache.nifi.util.StringUtils;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"http", "https", "remote", "copy", "archive"})
 @CapabilityDescription("Performs an HTTP Post with the content of the FlowFile. "
-        + "Uses a connection pool with max number of connections equal to its Concurrent Tasks configuration.")
+        + "Uses a connection pool with max number of connections equal to "
+        + "the number of possible endpoints multiplied by the Concurrent Tasks configuration.")
 public class PostHTTP extends AbstractProcessor {
 
     public static final String CONTENT_TYPE_HEADER = "Content-Type";
@@ -154,6 +160,7 @@ public class PostHTTP extends AbstractProcessor {
     public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version";
     public static final String TRANSACTION_ID_HEADER = "x-nifi-transaction-id";
     public static final String PROTOCOL_VERSION = "3";
+    public static final String REMOTE_DN = "remote.dn";
 
     public static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
             .name("URL")
@@ -267,9 +274,15 @@ public class PostHTTP extends AbstractProcessor {
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> properties;
 
-    private final AtomicReference<DestinationAccepts> acceptsRef = new AtomicReference<>();
     private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference<>();
-    private final ConcurrentMap<String, Config> configMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, DestinationAccepts> destinationAcceptsMap = new ConcurrentHashMap<>();
+    private volatile PoolingHttpClientConnectionManager connManager;
+    private volatile CloseableHttpClient client;
+    private volatile RequestConfig requestConfig;
+
+    // this is used when creating thet HttpContext, which is a thread local variable that is used by
+    // HTTPClient to obtain an available, reusable connection
+    private volatile Principal principal;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -335,15 +348,15 @@ public class PostHTTP extends AbstractProcessor {
 
     @OnStopped
     public void onStopped() {
-        this.acceptsRef.set(null);
+        destinationAcceptsMap.clear();
 
-        for (final Map.Entry<String, Config> entry : configMap.entrySet()) {
-            final Config config = entry.getValue();
-            config.getConnectionManager().shutdown();
+        try {
+            connManager.shutdown();
+            client.close();
+        } catch (IOException e) {
+            getLogger().error("Could not properly shutdown connections", e);
         }
 
-        configMap.clear();
-
         final StreamThrottler throttler = throttlerRef.getAndSet(null);
         if (throttler != null) {
             try {
@@ -358,28 +371,18 @@ public class PostHTTP extends AbstractProcessor {
     public void onScheduled(final ProcessContext context) {
         final Double bytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
         this.throttlerRef.set(bytesPerSecond == null ? null : new LeakyBucketStreamThrottler(bytesPerSecond.intValue()));
-    }
-
-    private String getBaseUrl(final String url) {
-        final int index = url.indexOf("/", 9);
-        if (index < 0) {
-            return url;
-        }
-
-        return url.substring(0, index);
-    }
 
-    private Config getConfig(final String url, final ProcessContext context) {
-        final String baseUrl = getBaseUrl(url);
-        Config config = configMap.get(baseUrl);
-        if (config != null) {
-            return config;
-        }
+        String hostname = "unknown";
+        try {
+            hostname = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException ignore) {}
+        principal = new X500Principal("CN=" + hostname + ", OU=unknown, O=unknown, C=unknown");
 
-        final PoolingHttpClientConnectionManager conMan;
+        // setup the PoolingHttpClientConnectionManager
         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         if (sslContextService == null) {
-            conMan = new PoolingHttpClientConnectionManager();
+            connManager = new PoolingHttpClientConnectionManager();
+
         } else {
             final SSLContext sslContext;
             try {
@@ -397,15 +400,110 @@ public class PostHTTP extends AbstractProcessor {
                             .register("http", PlainConnectionSocketFactory.getSocketFactory())
                             .build();
 
-            conMan = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+            connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+        }
+
+        // setup SocketConfig
+        SocketConfig.Builder socketConfigBuilder = SocketConfig.custom();
+        socketConfigBuilder.setSoTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+        SocketConfig socketConfig = socketConfigBuilder.build();
+        connManager.setDefaultSocketConfig(socketConfig);
+
+        // the +1 here accommodates math error calculating excess connections in AbstractConnPool.getPoolEntryBlocking()
+        connManager.setDefaultMaxPerRoute(context.getMaxConcurrentTasks() + 1);
+        // max total connections will get set in onTrigger(), because a new route will require increasing this
+        connManager.setMaxTotal(1);
+        // enable inactivity check, to detect and close idle connections
+        connManager.setValidateAfterInactivity(30_000);
+
+        // setup the HttpClientBuilder
+        final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
+        clientBuilder.setConnectionManager(connManager);
+        clientBuilder.setUserAgent(context.getProperty(USER_AGENT).getValue());
+        clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
+            @Override
+            public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
+                final HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
+                final ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
+                if (!conn.isOpen()) {
+                    return;
+                }
+
+                final SSLSession sslSession = conn.getSSLSession();
+
+                if (sslSession != null) {
+                    final Certificate[] certChain = sslSession.getPeerCertificates();
+                    if (certChain == null || certChain.length == 0) {
+                        throw new SSLPeerUnverifiedException("No certificates found");
+                    }
+
+                    try {
+                        final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]);
+                        httpContext.setAttribute(REMOTE_DN, cert.getSubjectDN().getName().trim());
+                    } catch (CertificateException e) {
+                        final String msg = "Could not extract subject DN from SSL session peer certificate";
+                        getLogger().warn(msg);
+                        throw new SSLPeerUnverifiedException(msg);
+                    }
+                }
+            }
+        });
+
+        HttpRequestRetryHandler retryHandler = (exception, attempt, httpContext) -> {
+            if (attempt > 3 || !isScheduled()) {
+                return false;
+            }
+            final HttpClientContext clientContext = HttpClientContext.adapt(httpContext);
+            // A heavily loaded remote listener can manifest as NoHttpResponseExceptions here.
+            // When this happens, take a 5 second snooze before retrying to give the remote a short break.
+            if (exception instanceof NoHttpResponseException) {
+                if (getLogger().isDebugEnabled()) {
+                    getLogger().debug("Sleeping for 5 secs then retrying {} request for remote server {}",
+                            new Object[]{clientContext.getRequest().getRequestLine().getMethod(), clientContext.getTargetHost()});
+                }
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {
+                    return false;
+                }
+                return true;
+            }
+            // do not retry more serious exceptions
+            return false;
+        };
+        clientBuilder.setRetryHandler(retryHandler);
+        clientBuilder.disableContentCompression();
+
+        final String username = context.getProperty(USERNAME).getValue();
+        final String password = context.getProperty(PASSWORD).getValue();
+        // set the credentials if appropriate
+        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+        clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+        if (username != null) {
+            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
         }
 
-        conMan.setDefaultMaxPerRoute(context.getMaxConcurrentTasks());
-        conMan.setMaxTotal(context.getMaxConcurrentTasks());
-        config = new Config(conMan);
-        final Config existingConfig = configMap.putIfAbsent(baseUrl, config);
+        // Set the proxy if specified
+        HTTPUtils.setProxy(context, clientBuilder, credentialsProvider);
+
+        // complete the HTTPClient build
+        client = clientBuilder.build();
 
-        return existingConfig == null ? config : existingConfig;
+        // setup RequestConfig
+        final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+        requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+        requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+        requestConfigBuilder.setRedirectsEnabled(false);
+        requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+        requestConfig = requestConfigBuilder.build();
+    }
+
+    private String getBaseUrl(final String url) {
+        final int index = url.indexOf("/", 9);
+        if (index < 0) {
+            return url;
+        }
+        return url.substring(0, index);
     }
 
     private SSLContext createSSLContext(final SSLContextService service)
@@ -427,9 +525,14 @@ public class PostHTTP extends AbstractProcessor {
                 keystore.load(in, service.getKeyStorePassword().toCharArray());
             }
             builder = builder.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray());
+            final String alias = keystore.aliases().nextElement();
+            final Certificate cert = keystore.getCertificate(alias);
+            if (cert instanceof X509Certificate) {
+                principal = ((X509Certificate) cert).getSubjectDN();
+            }
         }
 
-        builder = builder.useProtocol(service.getSslAlgorithm());
+        builder = builder.setProtocol(service.getSslAlgorithm());
 
         final SSLContext sslContext = builder.build();
         return sslContext;
@@ -459,14 +562,6 @@ public class PostHTTP extends AbstractProcessor {
 
         final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
         final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
-        final String userAgent = context.getProperty(USER_AGENT).getValue();
-
-        final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
-        requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
-        requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
-        requestConfigBuilder.setRedirectsEnabled(false);
-        requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
-        final RequestConfig requestConfig = requestConfigBuilder.build();
 
         final StreamThrottler throttler = throttlerRef.get();
 
@@ -474,75 +569,25 @@ public class PostHTTP extends AbstractProcessor {
         final AtomicLong bytesToSend = new AtomicLong(firstFlowFile.getSize());
 
         DestinationAccepts destinationAccepts = null;
-        CloseableHttpClient client = null;
         final String transactionId = UUID.randomUUID().toString();
-
-        final AtomicReference<String> dnHolder = new AtomicReference<>("none");
-
-        final Config config = getConfig(url, context);
-        final HttpClientConnectionManager conMan = config.getConnectionManager();
-
-        final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
-        clientBuilder.setConnectionManager(conMan);
-        clientBuilder.setUserAgent(userAgent);
-        clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
-            @Override
-            public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
-                final HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
-                final ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
-                if (!conn.isOpen()) {
-                    return;
-                }
-
-                final SSLSession sslSession = conn.getSSLSession();
-
-                if (sslSession != null) {
-                    final Certificate[] certChain = sslSession.getPeerCertificates();
-                    if (certChain == null || certChain.length == 0) {
-                        throw new SSLPeerUnverifiedException("No certificates found");
-                    }
-
-                    try {
-                        final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]);
-                        dnHolder.set(cert.getSubjectDN().getName().trim());
-                    } catch (CertificateException e) {
-                        final String msg = "Could not extract subject DN from SSL session peer certificate";
-                        logger.warn(msg);
-                        throw new SSLPeerUnverifiedException(msg);
-                    }
-                }
-            }
-        });
-
-        clientBuilder.disableAutomaticRetries();
-        clientBuilder.disableContentCompression();
-
-        final String username = context.getProperty(USERNAME).getValue();
-        final String password = context.getProperty(PASSWORD).getValue();
-        // set the credentials if appropriate
-        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-        clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-        if (username != null) {
-            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
-        }
-
-        // Set the proxy if specified
-        HTTPUtils.setProxy(context, clientBuilder, credentialsProvider);
-
-        client = clientBuilder.build();
+        final HttpClientContext httpClientContext = HttpClientContext.create();
+        httpClientContext.setUserToken(principal);
 
         // determine whether or not destination accepts flowfile/gzip
-        destinationAccepts = config.getDestinationAccepts();
+        final String baseUrl = getBaseUrl(url);
+        destinationAccepts = destinationAcceptsMap.get(baseUrl);
         if (destinationAccepts == null) {
             try {
-                destinationAccepts = getDestinationAcceptance(sendAsFlowFile, client, url, getLogger(), transactionId);
-                config.setDestinationAccepts(destinationAccepts);
+                destinationAccepts = getDestinationAcceptance(sendAsFlowFile, url, transactionId, httpClientContext);
+                if (null == destinationAcceptsMap.putIfAbsent(baseUrl, destinationAccepts)) {
+                    // url indicates a new route, so increase the max allowed open connections
+                    connManager.setMaxTotal(connManager.getMaxTotal() + connManager.getDefaultMaxPerRoute());
+                }
             } catch (final IOException e) {
                 firstFlowFile = session.penalize(firstFlowFile);
                 session.transfer(firstFlowFile, REL_FAILURE);
                 logger.error("Unable to communicate with destination {} to determine whether or not it can accept "
                         + "flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, firstFlowFile, e});
-                context.yield();
                 return;
             }
         }
@@ -583,23 +628,25 @@ public class PostHTTP extends AbstractProcessor {
                 }
 
                 try (final OutputStream out = wrappedOut) {
+                    final FlowFilePackager packager;
+                    if (!sendAsFlowFile) {
+                        packager = null;
+                    } else if (accepts.isFlowFileV3Accepted()) {
+                        packager = new FlowFilePackagerV3();
+                    } else if (accepts.isFlowFileV2Accepted()) {
+                        packager = new FlowFilePackagerV2();
+                    } else if (accepts.isFlowFileV1Accepted()) {
+                        packager = new FlowFilePackagerV1();
+                    } else {
+                        packager = null;
+                    }
+
                     for (final FlowFile flowFile : toSend) {
                         session.read(flowFile, new InputStreamCallback() {
                             @Override
                             public void process(final InputStream rawIn) throws IOException {
                                 try (final InputStream in = new BufferedInputStream(rawIn)) {
 
-                                    FlowFilePackager packager = null;
-                                    if (!sendAsFlowFile) {
-                                        packager = null;
-                                    } else if (accepts.isFlowFileV3Accepted()) {
-                                        packager = new FlowFilePackagerV3();
-                                    } else if (accepts.isFlowFileV2Accepted()) {
-                                        packager = new FlowFilePackagerV2();
-                                    } else if (accepts.isFlowFileV1Accepted()) {
-                                        packager = new FlowFilePackagerV1();
-                                    }
-
                                     // if none of the above conditions is met, we should never get here, because
                                     // we will have already verified that at least 1 of the FlowFile packaging
                                     // formats is acceptable if sending as FlowFile.
@@ -625,6 +672,15 @@ public class PostHTTP extends AbstractProcessor {
                     }
 
                     out.flush();
+                } catch (ProcessException pe) {
+                    // Pull out IOExceptions so that HTTPClient can properly do what it needs to do
+                    Throwable t = pe.getCause();
+                    if (t != null && t instanceof IOException) {
+                        IOException ioe = new IOException(t.getMessage());
+                        ioe.setStackTrace(t.getStackTrace());
+                        throw ioe;
+                    }
+                    throw pe;
                 }
             }
         }) {
@@ -639,6 +695,8 @@ public class PostHTTP extends AbstractProcessor {
             }
         };
 
+        final String flowFileDescription = toSend.size() <= 10 ? toSend.toString() : toSend.size() + " FlowFiles";
+
         if (context.getProperty(CHUNKED_ENCODING).isSet()) {
             entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean());
         }
@@ -654,11 +712,13 @@ public class PostHTTP extends AbstractProcessor {
             } else if (accepts.isFlowFileV1Accepted()) {
                 contentType = APPLICATION_FLOW_FILE_V1;
             } else {
-                logger.error("Cannot send data to {} because the destination does not accept FlowFiles and this processor is "
-                        + "configured to deliver FlowFiles; rolling back session", new Object[]{url});
-                session.rollback();
-                context.yield();
-                IOUtils.closeQuietly(client);
+                logger.error("Cannot send {} to {} because the destination does not accept FlowFiles and this processor is "
+                        + "configured to deliver FlowFiles; routing to failure",
+                        new Object[] {flowFileDescription, url});
+                for (FlowFile flowFile : toSend) {
+                    flowFile = session.penalize(flowFile);
+                    session.transfer(flowFile, REL_FAILURE);
+                }
                 return;
             }
         } else {
@@ -692,24 +752,17 @@ public class PostHTTP extends AbstractProcessor {
         }
 
         // Do the actual POST
-        final String flowFileDescription = toSend.size() <= 10 ? toSend.toString() : toSend.size() + " FlowFiles";
-
         final String uploadDataRate;
         final long uploadMillis;
         CloseableHttpResponse response = null;
         try {
             final StopWatch stopWatch = new StopWatch(true);
-            response = client.execute(post);
-
-            // consume input stream entirely, ignoring its contents. If we
-            // don't do this, the Connection will not be returned to the pool
-            EntityUtils.consume(response.getEntity());
+            response = client.execute(post, httpClientContext);
             stopWatch.stop();
             uploadDataRate = stopWatch.calculateDataRate(bytesToSend.get());
             uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-        } catch (final IOException e) {
+        } catch (final IOException | ProcessException e) {
             logger.error("Failed to Post {} due to {}; transferring to failure", new Object[]{flowFileDescription, e});
-            context.yield();
             for (FlowFile flowFile : toSend) {
                 flowFile = session.penalize(flowFile);
                 session.transfer(flowFile, REL_FAILURE);
@@ -718,9 +771,10 @@ public class PostHTTP extends AbstractProcessor {
         } finally {
             if (response != null) {
                 try {
-                    response.close();
-                } catch (final IOException e) {
-                    getLogger().warn("Failed to close HTTP Response due to {}", new Object[]{e});
+                    // consume input stream entirely, ignoring its contents. If we
+                    // don't do this, the Connection will not be returned to the pool
+                    EntityUtils.consume(response.getEntity());
+                } catch (final IOException ignore) {
                 }
             }
         }
@@ -744,10 +798,10 @@ public class PostHTTP extends AbstractProcessor {
             }
 
             if (holdUri == null) {
+                logger.error("Failed to Post {} to {}: sent content and received status code {}:{} but no Hold URI",
+                        new Object[]{flowFileDescription, url, responseCode, responseReason});
                 for (FlowFile flowFile : toSend) {
                     flowFile = session.penalize(flowFile);
-                    logger.error("Failed to Post {} to {}: sent content and received status code {}:{} but no Hold URI",
-                            new Object[]{flowFile, url, responseCode, responseReason});
                     session.transfer(flowFile, REL_FAILURE);
                 }
                 return;
@@ -756,22 +810,20 @@ public class PostHTTP extends AbstractProcessor {
 
         if (holdUri == null) {
             if (responseCode == HttpServletResponse.SC_SERVICE_UNAVAILABLE) {
+                logger.error("Failed to Post {} to {}: response code was {}:{}",
+                        new Object[]{flowFileDescription, url, responseCode, responseReason});
                 for (FlowFile flowFile : toSend) {
                     flowFile = session.penalize(flowFile);
-                    logger.error("Failed to Post {} to {}: response code was {}:{}; will yield processing, "
-                                    + "since the destination is temporarily unavailable",
-                            new Object[]{flowFile, url, responseCode, responseReason});
                     session.transfer(flowFile, REL_FAILURE);
                 }
-                context.yield();
                 return;
             }
 
             if (responseCode >= 300) {
+                logger.error("Failed to Post {} to {}: response code was {}:{}",
+                        new Object[]{flowFileDescription, url, responseCode, responseReason});
                 for (FlowFile flowFile : toSend) {
                     flowFile = session.penalize(flowFile);
-                    logger.error("Failed to Post {} to {}: response code was {}:{}",
-                            new Object[]{flowFile, url, responseCode, responseReason});
                     session.transfer(flowFile, REL_FAILURE);
                 }
                 return;
@@ -781,7 +833,7 @@ public class PostHTTP extends AbstractProcessor {
                     new Object[]{flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate});
 
             for (final FlowFile flowFile : toSend) {
-                session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + dnHolder.get(), uploadMillis, true);
+                session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + httpClientContext.getAttribute(REMOTE_DN), uploadMillis, true);
                 session.transfer(flowFile, REL_SUCCESS);
             }
             return;
@@ -815,54 +867,58 @@ public class PostHTTP extends AbstractProcessor {
 
         final HttpDelete delete = new HttpDelete(fullHoldUri);
         delete.setHeader(TRANSACTION_ID_HEADER, transactionId);
+        delete.setConfig(requestConfig);
 
-        while (true) {
-            try {
-                final HttpResponse holdResponse = client.execute(delete);
-                EntityUtils.consume(holdResponse.getEntity());
-                final int holdStatusCode = holdResponse.getStatusLine().getStatusCode();
-                final String holdReason = holdResponse.getStatusLine().getReasonPhrase();
-                if (holdStatusCode >= 300) {
-                    logger.error("Failed to delete Hold that destination placed on {}: got response code {}:{}; routing to failure",
-                            new Object[]{flowFileDescription, holdStatusCode, holdReason});
-
-                    for (FlowFile flowFile : toSend) {
-                        flowFile = session.penalize(flowFile);
-                        session.transfer(flowFile, REL_FAILURE);
-                    }
-                    return;
-                }
-
-                logger.info("Successfully Posted {} to {} in {} milliseconds at a rate of {}", new Object[]{flowFileDescription, url, uploadMillis, uploadDataRate});
-
-                for (final FlowFile flowFile : toSend) {
-                    session.getProvenanceReporter().send(flowFile, url);
-                    session.transfer(flowFile, REL_SUCCESS);
-                }
-                return;
-            } catch (final IOException e) {
-                logger.warn("Failed to delete Hold that destination placed on {} due to {}", new Object[]{flowFileDescription, e});
-            }
+        HttpResponse holdResponse = null;
+        try {
+            holdResponse = client.execute(delete, httpClientContext);
+            final int holdStatusCode = holdResponse.getStatusLine().getStatusCode();
+            final String holdReason = holdResponse.getStatusLine().getReasonPhrase();
+            if (holdStatusCode >= 300) {
+                logger.error("Failed to delete Hold that destination placed on {}: got response code {}:{}; routing to failure",
+                        new Object[]{flowFileDescription, holdStatusCode, holdReason});
 
-            if (!isScheduled()) {
-                context.yield();
-                logger.warn("Failed to delete Hold that destination placed on {}; Processor has been stopped so routing FlowFile(s) to failure", new Object[]{flowFileDescription});
                 for (FlowFile flowFile : toSend) {
                     flowFile = session.penalize(flowFile);
                     session.transfer(flowFile, REL_FAILURE);
                 }
                 return;
             }
+
+            logger.info("Successfully Posted {} to {} in {} at a rate of {}",
+                    new Object[]{flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate});
+
+            for (final FlowFile flowFile : toSend) {
+                session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + httpClientContext.getAttribute(REMOTE_DN), uploadMillis, true);
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+            return;
+
+        } catch (final IOException e) {
+            logger.warn("Failed to delete Hold that destination placed on {} due to {}; routing to failure", new Object[]{flowFileDescription, e});
+            for (FlowFile flowFile : toSend) {
+                flowFile = session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+            }
+        } finally {
+            if (null != holdResponse) {
+                try {
+                    // consume input stream entirely, ignoring its contents. If we
+                    // don't do this, the Connection will not be returned to the pool
+                    EntityUtils.consume(holdResponse.getEntity());
+                } catch (IOException ignore) {}
+            }
         }
     }
 
-    private DestinationAccepts getDestinationAcceptance(final boolean sendAsFlowFile, final HttpClient client, final String uri,
-                                                        final ComponentLog logger, final String transactionId) throws IOException {
+    private DestinationAccepts getDestinationAcceptance(final boolean sendAsFlowFile, final String uri, final String transactionId, final HttpContext httpContext) throws IOException {
         final HttpHead head = new HttpHead(uri);
+        head.setConfig(requestConfig);
         if (sendAsFlowFile) {
             head.addHeader(TRANSACTION_ID_HEADER, transactionId);
         }
-        final HttpResponse response = client.execute(head);
+
+        final HttpResponse response = client.execute(head, httpContext);
 
         // we assume that the destination can support FlowFile v1 always when the processor is also configured to send as a FlowFile
         // otherwise, we do not bother to make any determinations concerning this compatibility
@@ -901,12 +957,14 @@ public class PostHTTP extends AbstractProcessor {
                     }
                 }
 
-                if (acceptsFlowFileV3) {
-                    logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V3 + " if sending data as FlowFile");
-                } else if (acceptsFlowFileV2) {
-                    logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V2 + " if sending data as FlowFile");
-                } else if (acceptsFlowFileV1) {
-                    logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile");
+                if (getLogger().isDebugEnabled()) {
+                    if (acceptsFlowFileV3) {
+                        getLogger().debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V3 + " if sending data as FlowFile");
+                    } else if (acceptsFlowFileV2) {
+                        getLogger().debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V2 + " if sending data as FlowFile");
+                    } else if (acceptsFlowFileV1) {
+                        getLogger().debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile");
+                    }
                 }
             }
 
@@ -921,15 +979,17 @@ public class PostHTTP extends AbstractProcessor {
                 }
             }
 
-            if (acceptsGzip) {
-                logger.debug("Connection to URI " + uri + " indicates that inline GZIP compression is supported");
-            } else {
-                logger.debug("Connection to URI " + uri + " indicates that it does NOT support inline GZIP compression");
+            if (getLogger().isDebugEnabled()) {
+                if (acceptsGzip) {
+                    getLogger().debug("Connection to URI " + uri + " indicates that inline GZIP compression is supported");
+                } else {
+                    getLogger().debug("Connection to URI " + uri + " indicates that it does NOT support inline GZIP compression");
+                }
             }
 
             return new DestinationAccepts(acceptsFlowFileV3, acceptsFlowFileV2, acceptsFlowFileV1, acceptsGzip, protocolVersion);
         } else {
-            logger.warn("Unable to communicate with destination; when attempting to perform an HTTP HEAD, got unexpected response code of "
+            getLogger().warn("Unable to communicate with destination; when attempting to perform an HTTP HEAD, got unexpected response code of "
                     + statusCode + ": " + response.getStatusLine().getReasonPhrase());
             return new DestinationAccepts(false, false, false, false, null);
         }
@@ -971,26 +1031,4 @@ public class PostHTTP extends AbstractProcessor {
             return protocolVersion;
         }
     }
-
-    private static class Config {
-
-        private volatile DestinationAccepts destinationAccepts;
-        private final HttpClientConnectionManager conMan;
-
-        public Config(final HttpClientConnectionManager conMan) {
-            this.conMan = conMan;
-        }
-
-        public DestinationAccepts getDestinationAccepts() {
-            return this.destinationAccepts;
-        }
-
-        public void setDestinationAccepts(final DestinationAccepts destinationAccepts) {
-            this.destinationAccepts = destinationAccepts;
-        }
-
-        public HttpClientConnectionManager getConnectionManager() {
-            return conMan;
-        }
-    }
 }