You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/10/26 05:36:56 UTC
nifi git commit: NIFI-908 Added support for SSL in JMS connections. -
Added SSL context to JMS producer and consumer processors - Tony Kurc Amended
patch to check SSL need by scheme and exception consistency Reviewed by Tony
Kurc (tkurc@apache.org)
Repository: nifi
Updated Branches:
refs/heads/master 8d2f9bc64 -> 26edab318
NIFI-908 Added support for SSL in JMS connections.
- Added SSL context to JMS producer and consumer processors
- Tony Kurc Amended patch to check SSL need by scheme and exception consistency
Reviewed by Tony Kurc (tkurc@apache.org)
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/26edab31
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/26edab31
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/26edab31
Branch: refs/heads/master
Commit: 26edab3185008f1d34647ff3c11ba8b87815de02
Parents: 8d2f9bc
Author: Luke Williamson <lu...@gmail.com>
Authored: Mon Oct 26 00:29:05 2015 -0400
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Oct 26 00:29:05 2015 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/JmsConsumer.java | 2 +
.../apache/nifi/processors/standard/PutJMS.java | 2 +
.../processors/standard/util/JmsFactory.java | 79 ++++++++++++++++++--
.../processors/standard/util/JmsProperties.java | 8 ++
4 files changed, 86 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/26edab31/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
index b53d62f..461d381 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
@@ -25,6 +25,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_T
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
+import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
@@ -87,6 +88,7 @@ public abstract class JmsConsumer extends AbstractProcessor {
descriptors.add(BATCH_SIZE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
+ descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(ACKNOWLEDGEMENT_MODE);
descriptors.add(MESSAGE_SELECTOR);
descriptors.add(JMS_PROPS_TO_ATTRIBUTES);
http://git-wip-us.apache.org/repos/asf/nifi/blob/26edab31/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
index dff5a6b..b8902a9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
@@ -47,6 +47,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QU
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
+import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
import java.io.IOException;
import java.io.InputStream;
@@ -122,6 +123,7 @@ public class PutJMS extends AbstractProcessor {
descriptors.add(BATCH_SIZE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
+ descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(MESSAGE_TYPE);
descriptors.add(MESSAGE_PRIORITY);
descriptors.add(REPLY_TO_QUEUE);
http://git-wip-us.apache.org/repos/asf/nifi/blob/26edab31/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
index 35a65dc..ca5df9f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
@@ -28,12 +28,15 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.DURABLE_SUB
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
+import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
import java.io.IOException;
import java.io.ObjectOutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
@@ -57,12 +60,15 @@ import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.processor.ProcessContext;
-
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSslConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.URISupport.CompositeData;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
public class JmsFactory {
@@ -348,10 +354,43 @@ public class JmsFactory {
}
private static ConnectionFactory createConnectionFactory(final ProcessContext context) throws JMSException {
- final String url = context.getProperty(URL).getValue();
+ final URI uri;
+ try {
+ uri = new URI(context.getProperty(URL).getValue());
+ } catch (URISyntaxException e) {
+ // Should not happen - URL was validated
+ throw new IllegalArgumentException("Validated URI [" + context.getProperty(URL) + "] was invalid", e);
+ }
final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final String provider = context.getProperty(JMS_PROVIDER).getValue();
- return createConnectionFactory(url, timeoutMillis, provider);
+ if (uri.getScheme().equals("ssl") || (URISupport.isCompositeURI(uri) && compositeURIHasSSL(uri))) {
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ if (sslContextService == null) {
+ throw new IllegalArgumentException("Attempting to initiate SSL JMS connection and SSL Context is not set.");
+ }
+ return createSslConnectionFactory(uri, timeoutMillis, provider, sslContextService.getKeyStoreFile(),
+ sslContextService.getKeyStorePassword(), sslContextService.getTrustStoreFile(), sslContextService.getTrustStorePassword());
+ } else {
+ return createConnectionFactory(uri, timeoutMillis, provider);
+ }
+ }
+
+ private static boolean compositeURIHasSSL(URI uri) {
+ try {
+ CompositeData compositeData = URISupport.parseComposite(uri);
+ for(URI component : compositeData.getComponents()){
+ if(component.getScheme().equals("ssl")){
+ return true;
+ }
+ }
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Attempting to initiate JMS with invalid composite URI [" + uri + "]", e);
+ }
+ return false;
+ }
+
+ public static ConnectionFactory createConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider) throws JMSException {
+ return createConnectionFactory(uri.toString(), timeoutMillis, jmsProvider);
}
public static ConnectionFactory createConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider) throws JMSException {
@@ -366,6 +405,36 @@ public class JmsFactory {
}
}
+ public static ConnectionFactory createSslConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider,
+ final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException {
+ return createSslConnectionFactory(uri.toString(), timeoutMillis, jmsProvider, keystore, keystorePassword, truststore, truststorePassword);
+ }
+
+ public static ConnectionFactory createSslConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider,
+ final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException {
+ switch (jmsProvider) {
+ case ACTIVEMQ_PROVIDER: {
+ final ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory(url);
+ try {
+ factory.setKeyStore(keystore);
+ } catch (Exception e) {
+ throw new JMSException("Problem Setting the KeyStore: " + e.getMessage());
+ }
+ factory.setKeyStorePassword(keystorePassword);
+ try {
+ factory.setTrustStore(truststore);
+ } catch (Exception e) {
+ throw new JMSException("Problem Setting the TrustStore: " + e.getMessage());
+ }
+ factory.setTrustStorePassword(truststorePassword);
+ factory.setSendTimeout(timeoutMillis);
+ return factory;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown JMS Provider: " + jmsProvider);
+ }
+ }
+
public static Map<String, String> createAttributeMap(final Message message) throws JMSException {
final Map<String, String> attributes = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/nifi/blob/26edab31/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
index ed73569..f538624 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard.util;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
public class JmsProperties {
@@ -177,4 +178,11 @@ public class JmsProperties {
.defaultValue("1 MB")
.build();
+ // JMS SSL Properties
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("The Controller Service to use in order to obtain an SSL Context.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
}