You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/09 21:35:30 UTC

[2/5] incubator-nifi git commit: NIFI-239: Updated PostHTTP to not use deprecated methods and classes in Apache HTTP Client

NIFI-239: Updated PostHTTP to not use deprecated methods and classes in Apache HTTP Client


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d99180a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d99180a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d99180a7

Branch: refs/heads/develop
Commit: d99180a7160ac8a189507168dc97a35e0501eef9
Parents: 76ea1c6
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 9 13:18:00 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 9 13:18:00 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/PostHTTP.java      | 227 +++++++++++--------
 1 file changed, 136 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d99180a7/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
index 14f8a28..ee16610 100644
--- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
+++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
@@ -16,12 +16,18 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -39,20 +45,45 @@ import java.util.regex.Pattern;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
 import javax.security.cert.X509Certificate;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.http.Header;
+import org.apache.http.HttpException;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpResponseInterceptor;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.ManagedHttpClientConnection;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContexts;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.entity.ContentProducer;
+import org.apache.http.entity.EntityTemplate;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.HttpCoreContext;
+import org.apache.http.util.EntityUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.GZIPOutputStream;
-import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
-import org.apache.nifi.stream.io.StreamThrottler;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
@@ -69,7 +100,12 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.GZIPOutputStream;
+import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
+import org.apache.nifi.stream.io.StreamThrottler;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FlowFilePackager;
 import org.apache.nifi.util.FlowFilePackagerV1;
 import org.apache.nifi.util.FlowFilePackagerV2;
@@ -78,31 +114,6 @@ import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.ObjectHolder;
 import org.apache.nifi.util.StopWatch;
 
-import org.apache.http.Header;
-import org.apache.http.HttpException;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpResponseInterceptor;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpDelete;
-import org.apache.http.client.methods.HttpHead;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.params.ClientPNames;
-import org.apache.http.conn.ClientConnectionManager;
-import org.apache.http.conn.HttpRoutedConnection;
-import org.apache.http.conn.scheme.Scheme;
-import org.apache.http.conn.ssl.SSLSocketFactory;
-import org.apache.http.entity.ContentProducer;
-import org.apache.http.entity.EntityTemplate;
-import org.apache.http.impl.client.AbstractHttpClient;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.impl.conn.PoolingClientConnectionManager;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
-import org.apache.http.protocol.ExecutionContext;
-import org.apache.http.protocol.HttpContext;
-import org.apache.http.util.EntityUtils;
-
 import com.sun.jersey.api.client.ClientResponse.Status;
 
 @SupportsBatching
@@ -308,28 +319,64 @@ public class PostHTTP extends AbstractProcessor {
             return config;
         }
 
-        final ClientConnectionManager conMan = new PoolingClientConnectionManager();
-        registerUrlWithManager(url, context, conMan);
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final SSLContext sslContext;
+        try {
+            sslContext = createSSLContext(sslContextService);
+        } catch (final Exception e) {
+            throw new ProcessException(e);
+        }
+        
+        final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[] { "TLSv1" }, null,
+                SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
+
+        final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
+                .register("https", sslsf).build();
+        
+        final PoolingHttpClientConnectionManager conMan = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+        conMan.setDefaultMaxPerRoute(context.getMaxConcurrentTasks());
+        conMan.setMaxTotal(context.getMaxConcurrentTasks());
         config = new Config(conMan);
         final Config existingConfig = configMap.putIfAbsent(baseUrl, config);
 
         return (existingConfig == null) ? config : existingConfig;
     }
+    
+    
+    private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException, 
+        CertificateException, KeyManagementException, UnrecoverableKeyException 
+    {
+        final KeyStore truststore  = KeyStore.getInstance(service.getTrustStoreType());
+        try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) {
+            truststore.load(in, service.getTrustStorePassword().toCharArray());
+        }
+        
+        final KeyStore keystore  = KeyStore.getInstance(service.getKeyStoreType());
+        try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
+            keystore.load(in, service.getKeyStorePassword().toCharArray());
+        }
+        
+        SSLContext sslContext = SSLContexts.custom()
+                .loadTrustMaterial(truststore, new TrustSelfSignedStrategy())
+                .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray())
+                .build();
+        
+        return sslContext;
+    }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
         final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
         final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
-
-        final HttpParams httpParams = new BasicHttpParams();
-        HttpConnectionParams.setConnectionTimeout(httpParams, context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
-        HttpConnectionParams.setSoTimeout(httpParams, context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
-        httpParams.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS, false);
         final String userAgent = context.getProperty(USER_AGENT).getValue();
-        if (userAgent != null) {
-            httpParams.setParameter("http.useragent", userAgent);
-        }
 
+        final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+        requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+        requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+        requestConfigBuilder.setRedirectsEnabled(false);
+        requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+        final RequestConfig requestConfig = requestConfigBuilder.build();
+        
         final StreamThrottler throttler = throttlerRef.get();
         final ProcessorLog logger = getLogger();
 
@@ -339,7 +386,7 @@ public class PostHTTP extends AbstractProcessor {
 
         final List<FlowFile> toSend = new ArrayList<>();
         DestinationAccepts destinationAccepts = null;
-        HttpClient client = null;
+        CloseableHttpClient client = null;
         final String transactionId = UUID.randomUUID().toString();
 
         final ObjectHolder<String> dnHolder = new ObjectHolder<>("none");
@@ -371,15 +418,20 @@ public class PostHTTP extends AbstractProcessor {
 
             if (client == null || destinationAccepts == null) {
                 final Config config = getConfig(url, context);
-                final ClientConnectionManager conMan = config.getConnectionManager();
-                client = new DefaultHttpClient(conMan, httpParams);
-
-                ((AbstractHttpClient) client).addResponseInterceptor(new HttpResponseInterceptor() {
+                final HttpClientConnectionManager conMan = config.getConnectionManager();
+                
+                final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
+                clientBuilder.setConnectionManager(conMan);
+                clientBuilder.setUserAgent(userAgent);
+                clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
                     @Override
-                    public void process(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
-                        final HttpRoutedConnection httpRoutedConnection = (HttpRoutedConnection) context.getAttribute(ExecutionContext.HTTP_CONNECTION);
-                        if (httpRoutedConnection.isSecure()) {
-                            final X509Certificate[] certChain = httpRoutedConnection.getSSLSession().getPeerCertificateChain();
+                    public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
+                        HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
+                        ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
+                        SSLSession sslSession = conn.getSSLSession();
+                        
+                        if ( sslSession != null ) {
+                            final X509Certificate[] certChain = sslSession.getPeerCertificateChain();
                             if (certChain == null || certChain.length == 0) {
                                 throw new SSLPeerUnverifiedException("No certificates found");
                             }
@@ -389,6 +441,23 @@ public class PostHTTP extends AbstractProcessor {
                         }
                     }
                 });
+                
+                clientBuilder.disableAutomaticRetries();
+                clientBuilder.disableContentCompression();
+                
+                final String username = context.getProperty(USERNAME).getValue();
+                final String password = context.getProperty(PASSWORD).getValue();
+                // set the credentials if appropriate
+                if (username != null) {
+                    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+                    if (password == null) {
+                        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
+                    } else {
+                        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+                    };
+                    clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                }
+                client = clientBuilder.build();
 
                 // determine whether or not destination accepts flowfile/gzip
                 destinationAccepts = config.getDestinationAccepts();
@@ -491,7 +560,8 @@ public class PostHTTP extends AbstractProcessor {
 
         entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean());
         post.setEntity(entity);
-
+        post.setConfig(requestConfig);
+        
         final String contentType;
         if (sendAsFlowFile) {
             if (accepts.isFlowFileV3Accepted()) {
@@ -537,7 +607,7 @@ public class PostHTTP extends AbstractProcessor {
 
         final String uploadDataRate;
         final long uploadMillis;
-        final HttpResponse response;
+        CloseableHttpResponse response = null;
         try {
             final StopWatch stopWatch = new StopWatch(true);
             response = client.execute(post);
@@ -556,6 +626,14 @@ public class PostHTTP extends AbstractProcessor {
                 session.transfer(flowFile, REL_FAILURE);
             }
             return;
+        } finally {
+            if ( response != null ) {
+                try {
+                    response.close();
+                } catch (IOException e) {
+                    getLogger().warn("Failed to close HTTP Response due to {}", new Object[] {e});
+                }
+            }
         }
 
         // If we get a 'SEE OTHER' status code and an HTTP header that indicates that the intent
@@ -686,39 +764,6 @@ public class PostHTTP extends AbstractProcessor {
         }
     }
 
-    private void registerUrlWithManager(final String url, final ProcessContext processContext, final ClientConnectionManager conMan) {
-        URI uriObject;
-        try {
-            uriObject = new URI(url);
-        } catch (URISyntaxException e) {
-            throw new ProcessException(e);  // won't happen because of our 
-        }
-        int port = uriObject.getPort();
-        if (port == -1) {
-            port = 443;
-        }
-
-        final boolean secure = (url.toLowerCase().startsWith("https"));
-        if (!secure) {
-            return;
-        }
-
-        final SSLContext sslContext = createSslContext(processContext);
-        final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(sslContext);
-        final Scheme newScheme = new Scheme("https", port, sslSocketFactory);
-        conMan.getSchemeRegistry().register(newScheme);
-    }
-
-    /**
-     * Creates a SSL context based on the processor's optional properties.
-     * <p/>
-     *
-     * @return an SSLContext instance
-     */
-    private SSLContext createSslContext(final ProcessContext context) {
-        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        return (sslContextService == null) ? null : sslContextService.createSSLContext(ClientAuth.REQUIRED);
-    }
 
     private DestinationAccepts getDestinationAcceptance(final HttpClient client, final String uri, final ProcessorLog logger, final String transactionId) throws IOException {
         final HttpHead head = new HttpHead(uri);
@@ -838,9 +883,9 @@ public class PostHTTP extends AbstractProcessor {
     private static class Config {
 
         private volatile DestinationAccepts destinationAccepts;
-        private final ClientConnectionManager conMan;
+        private final HttpClientConnectionManager conMan;
 
-        public Config(final ClientConnectionManager conMan) {
+        public Config(final HttpClientConnectionManager conMan) {
             this.conMan = conMan;
         }
 
@@ -852,7 +897,7 @@ public class PostHTTP extends AbstractProcessor {
             this.destinationAccepts = destinationAccepts;
         }
 
-        public ClientConnectionManager getConnectionManager() {
+        public HttpClientConnectionManager getConnectionManager() {
             return conMan;
         }
     }