You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/04/20 05:25:55 UTC

nifi git commit: NIFI-1197 checkstyle (+2 squashed commits) Squashed commits: [b4e9b5f] NIFI-1197 fixed name/displayName on properties [d39f82b] NIFI-1197 Added SSL support for MongoDB processors

Repository: nifi
Updated Branches:
  refs/heads/master 0fed158d1 -> 0f05c77a7


NIFI-1197 checkstyle (+2 squashed commits)
Squashed commits:
[b4e9b5f] NIFI-1197 fixed name/displayName on properties
[d39f82b] NIFI-1197 Added SSL support for MongoDB processors

This closes #360.

Signed-off-by: Andy LoPresto <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0f05c77a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0f05c77a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0f05c77a

Branch: refs/heads/master
Commit: 0f05c77a7376bc44ca1e2242c03cc11f201e6a1f
Parents: 0fed158
Author: Pierre Villard <pi...@gmail.com>
Authored: Sun Apr 17 17:24:59 2016 +0200
Committer: Andy LoPresto <al...@apache.org>
Committed: Tue Apr 19 20:25:19 2016 -0700

----------------------------------------------------------------------
 .../nifi-mongodb-nar/pom.xml                    |   7 +-
 .../nifi-mongodb-processors/pom.xml             |   4 +
 .../mongodb/AbstractMongoProcessor.java         |  73 ++++++++++++-
 .../nifi/processors/mongodb/GetMongo.java       |  41 ++++----
 .../nifi/processors/mongodb/PutMongo.java       |  41 ++++----
 .../mongodb/AbstractMongoProcessorTest.java     | 104 +++++++++++++++++++
 nifi-nar-bundles/nifi-mongodb-bundle/pom.xml    |  12 ++-
 pom.xml                                         |   2 +-
 8 files changed, 235 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
index a17b05f..e714da3 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
@@ -23,7 +23,6 @@
     </parent>
 
     <artifactId>nifi-mongodb-nar</artifactId>
-    <version>1.0.0-SNAPSHOT</version>
     <packaging>nar</packaging>
     <properties>
         <maven.javadoc.skip>true</maven.javadoc.skip>
@@ -33,8 +32,12 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mongodb-processors</artifactId>
-            <version>1.0.0-SNAPSHOT</version>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
index ff67f79..0a26cd6 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
@@ -39,6 +39,10 @@
             <artifactId>nifi-processor-utils</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
index fae007f..7e3a196 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
@@ -19,16 +19,26 @@
 package org.apache.nifi.processors.mongodb;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
+import javax.net.ssl.SSLContext;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.authentication.exception.ProviderCreationException;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
 import org.bson.Document;
 
 import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoClientOptions.Builder;
 import com.mongodb.MongoClientURI;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
@@ -52,6 +62,34 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
         .required(true)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+        .name("ssl-context-service")
+        .displayName("SSL Context Service")
+        .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+                + "connections.")
+        .required(false)
+        .identifiesControllerService(SSLContextService.class)
+        .build();
+    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
+        .name("ssl-client-auth")
+        .displayName("Client Auth")
+        .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+                + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+                + "has been defined and enabled.")
+        .required(false)
+        .allowableValues(SSLContextService.ClientAuth.values())
+        .defaultValue("REQUIRED")
+        .build();
+
+    static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+    static {
+        descriptors.add(URI);
+        descriptors.add(DATABASE_NAME);
+        descriptors.add(COLLECTION_NAME);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(CLIENT_AUTH);
+    }
 
     protected MongoClient mongoClient;
 
@@ -63,15 +101,48 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
 
         getLogger().info("Creating MongoClient");
 
+        // Set up the client for secure (SSL/TLS communications) if configured to do so
+        final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
+        final SSLContext sslContext;
+
+        if (sslService != null) {
+            final SSLContextService.ClientAuth clientAuth;
+            if (StringUtils.isBlank(rawClientAuth)) {
+                clientAuth = SSLContextService.ClientAuth.REQUIRED;
+            } else {
+                try {
+                    clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
+                } catch (final IllegalArgumentException iae) {
+                    throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
+                            rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
+                }
+            }
+            sslContext = sslService.createSSLContext(clientAuth);
+        } else {
+            sslContext = null;
+        }
+
         try {
             final String uri = context.getProperty(URI).getValue();
-            mongoClient = new MongoClient(new MongoClientURI(uri));
+            if(sslContext == null) {
+                mongoClient = new MongoClient(new MongoClientURI(uri));
+            } else {
+                mongoClient = new MongoClient(new MongoClientURI(uri, getClientOptions(sslContext)));
+            }
         } catch (Exception e) {
             getLogger().error("Failed to schedule PutMongo due to {}", new Object[] { e }, e);
             throw e;
         }
     }
 
+    protected Builder getClientOptions(final SSLContext sslContext) {
+        MongoClientOptions.Builder builder = MongoClientOptions.builder();
+        builder.sslEnabled(true);
+        builder.socketFactory(sslContext.getSocketFactory());
+        return builder;
+    }
+
     @OnStopped
     public final void closeClient() {
         if (mongoClient != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index c2b49d9..ebe7a24 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -100,35 +100,34 @@ public class GetMongo extends AbstractMongoProcessor {
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
         .build();
 
-    private final List<PropertyDescriptor> descriptors;
-
-    private final Set<Relationship> relationships;
-
-    public GetMongo() {
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(URI);
-        descriptors.add(DATABASE_NAME);
-        descriptors.add(COLLECTION_NAME);
-        descriptors.add(QUERY);
-        descriptors.add(PROJECTION);
-        descriptors.add(SORT);
-        descriptors.add(LIMIT);
-        descriptors.add(BATCH_SIZE);
-        this.descriptors = Collections.unmodifiableList(descriptors);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        this.relationships = Collections.unmodifiableSet(relationships);
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(PROJECTION);
+        _propertyDescriptors.add(SORT);
+        _propertyDescriptors.add(LIMIT);
+        _propertyDescriptors.add(BATCH_SIZE);
+        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
+        _propertyDescriptors.add(CLIENT_AUTH);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        relationships = Collections.unmodifiableSet(_relationships);
     }
 
     @Override
     public Set<Relationship> getRelationships() {
-        return this.relationships;
+        return relationships;
     }
 
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
+        return propertyDescriptors;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
index ae4009c..5f6d875 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -106,36 +106,33 @@ public class PutMongo extends AbstractMongoProcessor {
         .defaultValue("UTF-8")
         .build();
 
-    private final List<PropertyDescriptor> descriptors;
-
-    private final Set<Relationship> relationships;
-
-    public PutMongo() {
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(URI);
-        descriptors.add(DATABASE_NAME);
-        descriptors.add(COLLECTION_NAME);
-        descriptors.add(MODE);
-        descriptors.add(UPSERT);
-        descriptors.add(UPDATE_QUERY_KEY);
-        descriptors.add(WRITE_CONCERN);
-        descriptors.add(CHARACTER_SET);
-        this.descriptors = Collections.unmodifiableList(descriptors);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        this.relationships = Collections.unmodifiableSet(relationships);
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(MODE);
+        _propertyDescriptors.add(UPSERT);
+        _propertyDescriptors.add(UPDATE_QUERY_KEY);
+        _propertyDescriptors.add(WRITE_CONCERN);
+        _propertyDescriptors.add(CHARACTER_SET);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
     }
 
     @Override
     public Set<Relationship> getRelationships() {
-        return this.relationships;
+        return relationships;
     }
 
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
+        return propertyDescriptors;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java
new file mode 100644
index 0000000..1750cc2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.authentication.exception.ProviderCreationException;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoClientOptions.Builder;
+
+public class AbstractMongoProcessorTest {
+
+    MockAbstractMongoProcessor processor;
+    private TestRunner testRunner;
+
+    @Before
+    public void setUp() throws Exception {
+        processor = new MockAbstractMongoProcessor();
+        testRunner = TestRunners.newTestRunner(processor);
+    }
+
+    @Test
+    public void testcreateClientWithSSL() throws Exception {
+        SSLContextService sslService = mock(SSLContextService.class);
+        SSLContext sslContext = mock(SSLContext.class);
+        when(sslService.getIdentifier()).thenReturn("ssl-context");
+        when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext);
+        testRunner.addControllerService("ssl-context", sslService);
+        testRunner.enableControllerService(sslService);
+        testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017");
+        testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
+        testRunner.assertValid(sslService);
+        processor.createClient(testRunner.getProcessContext());
+        assertNotNull(processor.mongoClient);
+        processor.mongoClient = null;
+        testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "WANT");
+        processor.createClient(testRunner.getProcessContext());
+        assertNotNull(processor.mongoClient);
+    }
+
+    @Test(expected = ProviderCreationException.class)
+    public void testcreateClientWithSSLBadClientAuth() throws Exception {
+        SSLContextService sslService = mock(SSLContextService.class);
+        SSLContext sslContext = mock(SSLContext.class);
+        when(sslService.getIdentifier()).thenReturn("ssl-context");
+        when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext);
+        testRunner.addControllerService("ssl-context", sslService);
+        testRunner.enableControllerService(sslService);
+        testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017");
+        testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
+        testRunner.assertValid(sslService);
+        processor.createClient(testRunner.getProcessContext());
+        assertNotNull(processor.mongoClient);
+        processor.mongoClient = null;
+        testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "BAD");
+        processor.createClient(testRunner.getProcessContext());
+    }
+
+
+    /**
+     * Provides a stubbed processor instance for testing
+     */
+    public static class MockAbstractMongoProcessor extends AbstractMongoProcessor {
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+            // nothing to do
+        }
+
+        @Override
+        protected Builder getClientOptions(SSLContext sslContext) {
+            return MongoClientOptions.builder();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
index db9e87d..25c6f69 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
@@ -22,9 +22,7 @@
         <version>1.0.0-SNAPSHOT</version>
     </parent>
 
-    <groupId>org.apache.nifi</groupId>
     <artifactId>nifi-mongodb-bundle</artifactId>
-    <version>1.0.0-SNAPSHOT</version>
     <packaging>pom</packaging>
 
     <modules>
@@ -32,4 +30,14 @@
         <module>nifi-mongodb-nar</module>
     </modules>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-mongodb-processors</artifactId>
+                <version>1.0.0-SNAPSHOT</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e702785..29b936b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -219,7 +219,7 @@ language governing permissions and limitations under the License. -->
             <dependency>
                 <groupId>org.mongodb</groupId>
                 <artifactId>mongo-java-driver</artifactId>
-                <version>3.0.2</version>
+                <version>3.2.2</version>
             </dependency>
             <dependency>
                 <groupId>com.wordnik</groupId>