You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/02 20:32:07 UTC

[10/50] [abbrv] nifi git commit: NIFI-908 Added support for SSL in JMS connections. - Added SSL context to JMS producer and consumer processors - Tony Kurc Amended patch to check SSL need by scheme and exception consistency Reviewed by Tony Kurc (tkurc

NIFI-908 Added support for SSL in JMS connections.
 - Added SSL context to JMS producer and consumer processors
 - Tony Kurc Amended patch to check SSL need by scheme and exception consistency
Reviewed by Tony Kurc (tkurc@apache.org)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/26edab31
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/26edab31
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/26edab31

Branch: refs/heads/NIFI-730
Commit: 26edab3185008f1d34647ff3c11ba8b87815de02
Parents: 8d2f9bc
Author: Luke Williamson <lu...@gmail.com>
Authored: Mon Oct 26 00:29:05 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Oct 26 00:29:05 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/JmsConsumer.java   |  2 +
 .../apache/nifi/processors/standard/PutJMS.java |  2 +
 .../processors/standard/util/JmsFactory.java    | 79 ++++++++++++++++++--
 .../processors/standard/util/JmsProperties.java |  8 ++
 4 files changed, 86 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/26edab31/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
index b53d62f..461d381 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
@@ -25,6 +25,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_T
 import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
 import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
 import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
+import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
 import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
 import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
 import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
@@ -87,6 +88,7 @@ public abstract class JmsConsumer extends AbstractProcessor {
         descriptors.add(BATCH_SIZE);
         descriptors.add(USERNAME);
         descriptors.add(PASSWORD);
+        descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(ACKNOWLEDGEMENT_MODE);
         descriptors.add(MESSAGE_SELECTOR);
         descriptors.add(JMS_PROPS_TO_ATTRIBUTES);

http://git-wip-us.apache.org/repos/asf/nifi/blob/26edab31/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
index dff5a6b..b8902a9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
@@ -47,6 +47,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QU
 import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
 import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
 import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
+import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -122,6 +123,7 @@ public class PutJMS extends AbstractProcessor {
         descriptors.add(BATCH_SIZE);
         descriptors.add(USERNAME);
         descriptors.add(PASSWORD);
+        descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(MESSAGE_TYPE);
         descriptors.add(MESSAGE_PRIORITY);
         descriptors.add(REPLY_TO_QUEUE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/26edab31/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
index 35a65dc..ca5df9f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
@@ -28,12 +28,15 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.DURABLE_SUB
 import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
 import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
 import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
+import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
 import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
 import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
 import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
@@ -57,12 +60,15 @@ import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.processor.ProcessContext;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSslConnectionFactory;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.URISupport.CompositeData;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
 
 public class JmsFactory {
 
@@ -348,10 +354,43 @@ public class JmsFactory {
     }
 
     private static ConnectionFactory createConnectionFactory(final ProcessContext context) throws JMSException {
-        final String url = context.getProperty(URL).getValue();
+        final URI uri;
+        try {
+            uri = new URI(context.getProperty(URL).getValue());
+        } catch (URISyntaxException e) {
+            // Should not happen - URL was validated
+            throw new IllegalArgumentException("Validated URI [" + context.getProperty(URL) + "] was invalid", e);
+        }
         final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
         final String provider = context.getProperty(JMS_PROVIDER).getValue();
-        return createConnectionFactory(url, timeoutMillis, provider);
+        if (uri.getScheme().equals("ssl") || (URISupport.isCompositeURI(uri) && compositeURIHasSSL(uri))) {
+            final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+            if (sslContextService == null) {
+                throw new IllegalArgumentException("Attempting to initiate SSL JMS connection and SSL Context is not set.");
+            }
+            return createSslConnectionFactory(uri, timeoutMillis, provider, sslContextService.getKeyStoreFile(),
+                    sslContextService.getKeyStorePassword(), sslContextService.getTrustStoreFile(), sslContextService.getTrustStorePassword());
+        } else {
+            return createConnectionFactory(uri, timeoutMillis, provider);
+        }
+    }
+
+    private static boolean compositeURIHasSSL(URI uri) {
+        try {
+            CompositeData compositeData = URISupport.parseComposite(uri);
+            for(URI component : compositeData.getComponents()){
+                if(component.getScheme().equals("ssl")){
+                    return true;
+                }
+            }
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Attempting to initiate JMS with invalid composite URI [" + uri + "]", e);
+        }
+        return false;
+    }
+
+    public static ConnectionFactory createConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider) throws JMSException {
+        return createConnectionFactory(uri.toString(), timeoutMillis, jmsProvider);
     }
 
     public static ConnectionFactory createConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider) throws JMSException {
@@ -366,6 +405,36 @@ public class JmsFactory {
         }
     }
 
+    public static ConnectionFactory createSslConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider,
+            final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException {
+        return createSslConnectionFactory(uri.toString(), timeoutMillis, jmsProvider, keystore, keystorePassword, truststore, truststorePassword);
+    }
+
+    public static ConnectionFactory createSslConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider,
+                            final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException {
+        switch (jmsProvider) {
+            case ACTIVEMQ_PROVIDER: {
+                final ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory(url);
+                try {
+                    factory.setKeyStore(keystore);
+                } catch (Exception e) {
+                    throw new JMSException("Problem Setting the KeyStore: " + e.getMessage());
+                }
+                factory.setKeyStorePassword(keystorePassword);
+                try {
+                    factory.setTrustStore(truststore);
+                } catch (Exception e) {
+                    throw new JMSException("Problem Setting the TrustStore: " + e.getMessage());
+                }
+                factory.setTrustStorePassword(truststorePassword);
+                factory.setSendTimeout(timeoutMillis);
+                return factory;
+            }
+            default:
+                throw new IllegalArgumentException("Unknown JMS Provider: " + jmsProvider);
+        }
+    }
+
     public static Map<String, String> createAttributeMap(final Message message) throws JMSException {
         final Map<String, String> attributes = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/26edab31/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
index ed73569..f538624 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard.util;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
 
 public class JmsProperties {
 
@@ -177,4 +178,11 @@ public class JmsProperties {
             .defaultValue("1 MB")
             .build();
 
+    // JMS SSL Properties
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("The Controller Service to use in order to obtain an SSL Context.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
 }