You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/04/21 08:57:47 UTC

nifi git commit: NIFI-1521 Backported AMQP SSL changes to 0.x support branch. (+1 squashed commit)

Repository: nifi
Updated Branches:
  refs/heads/0.x 3bf9e2894 -> f964c8b22


NIFI-1521 Backported AMQP SSL changes to 0.x support branch. (+1 squashed commit)

Squashed commits:
[e748bb0] NIFI-1521 Added SSL configuration to AMQP processor.

fixed build failure (+5 squashed commits)
Squashed commits:
[a3405f8] NIFI-1521 fixed build failure
[bf91743] NIFI-1521 fixed name/displayName in properties
[a44beaa] NIFI-1521 Added unit test
[c523689] NIFI-1521 Added client auth property and reverted modification on SSL context service
[75f3457] NIFI-1521 Allows use of SSL in AMQP Processor

This closes #232.

Signed-off-by: Andy LoPresto <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f964c8b2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f964c8b2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f964c8b2

Branch: refs/heads/0.x
Commit: f964c8b2215a3e694b39f549b51925bce104f80b
Parents: 3bf9e28
Author: Pierre Villard <pi...@gmail.com>
Authored: Wed Feb 17 16:30:09 2016 +0100
Committer: Andy LoPresto <al...@apache.org>
Committed: Wed Apr 20 23:57:09 2016 -0700

----------------------------------------------------------------------
 .../nifi-amqp-bundle/nifi-amqp-nar/pom.xml      |  5 ++
 .../nifi-amqp-processors/pom.xml                |  5 +-
 .../amqp/processors/AbstractAMQPProcessor.java  | 53 +++++++++++++-
 .../processors/AbstractAMQPProcessorTest.java   | 75 ++++++++++++++++++++
 4 files changed, 135 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f964c8b2/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-nar/pom.xml b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-nar/pom.xml
index 049de77..d563313 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-nar/pom.xml
@@ -30,6 +30,11 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-amqp-processors</artifactId>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f964c8b2/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
index 83339b9..361b69f 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
@@ -33,7 +33,10 @@
 			<groupId>org.apache.nifi</groupId>
 			<artifactId>nifi-api</artifactId>
 		</dependency>
-		
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+        </dependency>
 		<dependency>
 			<groupId>org.apache.nifi</groupId>
 			<artifactId>nifi-processor-utils</artifactId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f964c8b2/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
index e572870..222e74d 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
@@ -19,8 +19,10 @@ package org.apache.nifi.amqp.processors;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
+import javax.net.ssl.SSLContext;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.authorization.exception.ProviderCreationException;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -28,7 +30,8 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
@@ -84,6 +87,23 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
             .allowableValues("0.9.1")
             .defaultValue("0.9.1")
             .build();
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("ssl-context-service")
+            .displayName("SSL Context Service")
+            .description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
+            .name("ssl-client-auth")
+            .displayName("Client Auth")
+            .description("Client authentication policy when connecting to secure (TLS/SSL) AMQP broker. "
+                    + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+                    + "has been defined and enabled.")
+            .required(false)
+            .allowableValues(SSLContextService.ClientAuth.values())
+            .defaultValue("REQUIRED")
+            .build();
 
     static List<PropertyDescriptor> descriptors = new ArrayList<>();
 
@@ -98,6 +118,8 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
         descriptors.add(USER);
         descriptors.add(PASSWORD);
         descriptors.add(AMQP_VERSION);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(CLIENT_AUTH);
     }
 
     protected volatile Connection amqpConnection;
@@ -192,6 +214,33 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
             cf.setVirtualHost(vHost);
         }
 
+        // handles TLS/SSL aspects
+        final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
+        final SSLContext sslContext;
+
+        if (sslService != null) {
+            final SSLContextService.ClientAuth clientAuth;
+            if (StringUtils.isBlank(rawClientAuth)) {
+                clientAuth = SSLContextService.ClientAuth.REQUIRED;
+            } else {
+                try {
+                    clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
+                } catch (final IllegalArgumentException iae) {
+                    throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
+                            rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
+                }
+            }
+            sslContext = sslService.createSSLContext(clientAuth);
+        } else {
+            sslContext = null;
+        }
+
+        // check if the ssl context is set and add it to the factory if so
+        if (sslContext != null) {
+            cf.useSslProtocol(sslContext);
+        }
+
         try {
             Connection connection = cf.newConnection();
             return connection;

http://git-wip-us.apache.org/repos/asf/nifi/blob/f964c8b2/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
new file mode 100644
index 0000000..09f94dc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.nifi.authorization.exception.ProviderCreationException;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for the AbstractAMQPProcessor class
+ */
+public class AbstractAMQPProcessorTest {
+
+    MockAbstractAMQPProcessor processor;
+    private TestRunner testRunner;
+
+    @Before
+    public void setUp() throws Exception {
+        processor = new MockAbstractAMQPProcessor();
+        testRunner = TestRunners.newTestRunner(processor);
+    }
+
+    @Test(expected = ProviderCreationException.class)
+    public void testConnectToCassandraWithSSLBadClientAuth() throws Exception {
+        SSLContextService sslService = mock(SSLContextService.class);
+        when(sslService.getIdentifier()).thenReturn("ssl-context");
+        testRunner.addControllerService("ssl-context", sslService);
+        testRunner.enableControllerService(sslService);
+        testRunner.setProperty(AbstractAMQPProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
+        testRunner.setProperty(AbstractAMQPProcessor.HOST, "test");
+        testRunner.setProperty(AbstractAMQPProcessor.PORT, "9999");
+        testRunner.setProperty(AbstractAMQPProcessor.USER, "test");
+        testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "test");
+        testRunner.assertValid(sslService);
+        testRunner.setProperty(AbstractAMQPProcessor.CLIENT_AUTH, "BAD");
+        processor.onTrigger(testRunner.getProcessContext(), testRunner.getProcessSessionFactory());
+    }
+
+    /**
+     * Provides a stubbed processor instance for testing
+     */
+    public static class MockAbstractAMQPProcessor extends AbstractAMQPProcessor<AMQPConsumer> {
+        @Override
+        protected void rendezvousWithAmqp(ProcessContext context, ProcessSession session) throws ProcessException {
+            // nothing to do
+        }
+        @Override
+        protected AMQPConsumer finishBuildingTargetResource(ProcessContext context) {
+            return null;
+        }
+    }
+}