You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/09 21:35:30 UTC
[2/5] incubator-nifi git commit: NIFI-239: Updated PostHTTP to not
use deprecated methods and classes in Apache HTTP Client
NIFI-239: Updated PostHTTP to not use deprecated methods and classes in Apache HTTP Client
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d99180a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d99180a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d99180a7
Branch: refs/heads/develop
Commit: d99180a7160ac8a189507168dc97a35e0501eef9
Parents: 76ea1c6
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 9 13:18:00 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 9 13:18:00 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/PostHTTP.java | 227 +++++++++++--------
1 file changed, 136 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d99180a7/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
index 14f8a28..ee16610 100644
--- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
+++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
@@ -16,12 +16,18 @@
*/
package org.apache.nifi.processors.standard;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -39,20 +45,45 @@ import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
import javax.security.cert.X509Certificate;
import javax.servlet.http.HttpServletResponse;
+import org.apache.http.Header;
+import org.apache.http.HttpException;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpResponseInterceptor;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.ManagedHttpClientConnection;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContexts;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.entity.ContentProducer;
+import org.apache.http.entity.EntityTemplate;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.HttpCoreContext;
+import org.apache.http.util.EntityUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.GZIPOutputStream;
-import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
-import org.apache.nifi.stream.io.StreamThrottler;
-import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@@ -69,7 +100,12 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.GZIPOutputStream;
+import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
+import org.apache.nifi.stream.io.StreamThrottler;
+import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FlowFilePackager;
import org.apache.nifi.util.FlowFilePackagerV1;
import org.apache.nifi.util.FlowFilePackagerV2;
@@ -78,31 +114,6 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
-import org.apache.http.Header;
-import org.apache.http.HttpException;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpResponseInterceptor;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpDelete;
-import org.apache.http.client.methods.HttpHead;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.params.ClientPNames;
-import org.apache.http.conn.ClientConnectionManager;
-import org.apache.http.conn.HttpRoutedConnection;
-import org.apache.http.conn.scheme.Scheme;
-import org.apache.http.conn.ssl.SSLSocketFactory;
-import org.apache.http.entity.ContentProducer;
-import org.apache.http.entity.EntityTemplate;
-import org.apache.http.impl.client.AbstractHttpClient;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.impl.conn.PoolingClientConnectionManager;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
-import org.apache.http.protocol.ExecutionContext;
-import org.apache.http.protocol.HttpContext;
-import org.apache.http.util.EntityUtils;
-
import com.sun.jersey.api.client.ClientResponse.Status;
@SupportsBatching
@@ -308,28 +319,64 @@ public class PostHTTP extends AbstractProcessor {
return config;
}
- final ClientConnectionManager conMan = new PoolingClientConnectionManager();
- registerUrlWithManager(url, context, conMan);
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final SSLContext sslContext;
+ try {
+ sslContext = createSSLContext(sslContextService);
+ } catch (final Exception e) {
+ throw new ProcessException(e);
+ }
+
+ final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[] { "TLSv1" }, null,
+ SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
+
+ final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("https", sslsf).build();
+
+ final PoolingHttpClientConnectionManager conMan = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+ conMan.setDefaultMaxPerRoute(context.getMaxConcurrentTasks());
+ conMan.setMaxTotal(context.getMaxConcurrentTasks());
config = new Config(conMan);
final Config existingConfig = configMap.putIfAbsent(baseUrl, config);
return (existingConfig == null) ? config : existingConfig;
}
+
+
+ private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException,
+ CertificateException, KeyManagementException, UnrecoverableKeyException
+ {
+ final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType());
+ try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) {
+ truststore.load(in, service.getTrustStorePassword().toCharArray());
+ }
+
+ final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType());
+ try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
+ keystore.load(in, service.getKeyStorePassword().toCharArray());
+ }
+
+ SSLContext sslContext = SSLContexts.custom()
+ .loadTrustMaterial(truststore, new TrustSelfSignedStrategy())
+ .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray())
+ .build();
+
+ return sslContext;
+ }
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
-
- final HttpParams httpParams = new BasicHttpParams();
- HttpConnectionParams.setConnectionTimeout(httpParams, context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
- HttpConnectionParams.setSoTimeout(httpParams, context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
- httpParams.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS, false);
final String userAgent = context.getProperty(USER_AGENT).getValue();
- if (userAgent != null) {
- httpParams.setParameter("http.useragent", userAgent);
- }
+ final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+ requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+ requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+ requestConfigBuilder.setRedirectsEnabled(false);
+ requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+ final RequestConfig requestConfig = requestConfigBuilder.build();
+
final StreamThrottler throttler = throttlerRef.get();
final ProcessorLog logger = getLogger();
@@ -339,7 +386,7 @@ public class PostHTTP extends AbstractProcessor {
final List<FlowFile> toSend = new ArrayList<>();
DestinationAccepts destinationAccepts = null;
- HttpClient client = null;
+ CloseableHttpClient client = null;
final String transactionId = UUID.randomUUID().toString();
final ObjectHolder<String> dnHolder = new ObjectHolder<>("none");
@@ -371,15 +418,20 @@ public class PostHTTP extends AbstractProcessor {
if (client == null || destinationAccepts == null) {
final Config config = getConfig(url, context);
- final ClientConnectionManager conMan = config.getConnectionManager();
- client = new DefaultHttpClient(conMan, httpParams);
-
- ((AbstractHttpClient) client).addResponseInterceptor(new HttpResponseInterceptor() {
+ final HttpClientConnectionManager conMan = config.getConnectionManager();
+
+ final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
+ clientBuilder.setConnectionManager(conMan);
+ clientBuilder.setUserAgent(userAgent);
+ clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
@Override
- public void process(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
- final HttpRoutedConnection httpRoutedConnection = (HttpRoutedConnection) context.getAttribute(ExecutionContext.HTTP_CONNECTION);
- if (httpRoutedConnection.isSecure()) {
- final X509Certificate[] certChain = httpRoutedConnection.getSSLSession().getPeerCertificateChain();
+ public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
+ HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
+ ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
+ SSLSession sslSession = conn.getSSLSession();
+
+ if ( sslSession != null ) {
+ final X509Certificate[] certChain = sslSession.getPeerCertificateChain();
if (certChain == null || certChain.length == 0) {
throw new SSLPeerUnverifiedException("No certificates found");
}
@@ -389,6 +441,23 @@ public class PostHTTP extends AbstractProcessor {
}
}
});
+
+ clientBuilder.disableAutomaticRetries();
+ clientBuilder.disableContentCompression();
+
+ final String username = context.getProperty(USERNAME).getValue();
+ final String password = context.getProperty(PASSWORD).getValue();
+ // set the credentials if appropriate
+ if (username != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ if (password == null) {
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
+ } else {
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+ };
+ clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ }
+ client = clientBuilder.build();
// determine whether or not destination accepts flowfile/gzip
destinationAccepts = config.getDestinationAccepts();
@@ -491,7 +560,8 @@ public class PostHTTP extends AbstractProcessor {
entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean());
post.setEntity(entity);
-
+ post.setConfig(requestConfig);
+
final String contentType;
if (sendAsFlowFile) {
if (accepts.isFlowFileV3Accepted()) {
@@ -537,7 +607,7 @@ public class PostHTTP extends AbstractProcessor {
final String uploadDataRate;
final long uploadMillis;
- final HttpResponse response;
+ CloseableHttpResponse response = null;
try {
final StopWatch stopWatch = new StopWatch(true);
response = client.execute(post);
@@ -556,6 +626,14 @@ public class PostHTTP extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE);
}
return;
+ } finally {
+ if ( response != null ) {
+ try {
+ response.close();
+ } catch (IOException e) {
+ getLogger().warn("Failed to close HTTP Response due to {}", new Object[] {e});
+ }
+ }
}
// If we get a 'SEE OTHER' status code and an HTTP header that indicates that the intent
@@ -686,39 +764,6 @@ public class PostHTTP extends AbstractProcessor {
}
}
- private void registerUrlWithManager(final String url, final ProcessContext processContext, final ClientConnectionManager conMan) {
- URI uriObject;
- try {
- uriObject = new URI(url);
- } catch (URISyntaxException e) {
- throw new ProcessException(e); // won't happen because of our
- }
- int port = uriObject.getPort();
- if (port == -1) {
- port = 443;
- }
-
- final boolean secure = (url.toLowerCase().startsWith("https"));
- if (!secure) {
- return;
- }
-
- final SSLContext sslContext = createSslContext(processContext);
- final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(sslContext);
- final Scheme newScheme = new Scheme("https", port, sslSocketFactory);
- conMan.getSchemeRegistry().register(newScheme);
- }
-
- /**
- * Creates a SSL context based on the processor's optional properties.
- * <p/>
- *
- * @return an SSLContext instance
- */
- private SSLContext createSslContext(final ProcessContext context) {
- final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- return (sslContextService == null) ? null : sslContextService.createSSLContext(ClientAuth.REQUIRED);
- }
private DestinationAccepts getDestinationAcceptance(final HttpClient client, final String uri, final ProcessorLog logger, final String transactionId) throws IOException {
final HttpHead head = new HttpHead(uri);
@@ -838,9 +883,9 @@ public class PostHTTP extends AbstractProcessor {
private static class Config {
private volatile DestinationAccepts destinationAccepts;
- private final ClientConnectionManager conMan;
+ private final HttpClientConnectionManager conMan;
- public Config(final ClientConnectionManager conMan) {
+ public Config(final HttpClientConnectionManager conMan) {
this.conMan = conMan;
}
@@ -852,7 +897,7 @@ public class PostHTTP extends AbstractProcessor {
this.destinationAccepts = destinationAccepts;
}
- public ClientConnectionManager getConnectionManager() {
+ public HttpClientConnectionManager getConnectionManager() {
return conMan;
}
}