You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/04/20 05:25:55 UTC
nifi git commit: NIFI-1197 checkstyle (+2 squashed commits) Squashed
commits: [b4e9b5f] NIFI-1197 fixed name/displayName on properties [d39f82b]
NIFI-1197 Added SSL support for MongoDB processors
Repository: nifi
Updated Branches:
refs/heads/master 0fed158d1 -> 0f05c77a7
NIFI-1197 checkstyle (+2 squashed commits)
Squashed commits:
[b4e9b5f] NIFI-1197 fixed name/displayName on properties
[d39f82b] NIFI-1197 Added SSL support for MongoDB processors
This closes #360.
Signed-off-by: Andy LoPresto <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0f05c77a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0f05c77a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0f05c77a
Branch: refs/heads/master
Commit: 0f05c77a7376bc44ca1e2242c03cc11f201e6a1f
Parents: 0fed158
Author: Pierre Villard <pi...@gmail.com>
Authored: Sun Apr 17 17:24:59 2016 +0200
Committer: Andy LoPresto <al...@apache.org>
Committed: Tue Apr 19 20:25:19 2016 -0700
----------------------------------------------------------------------
.../nifi-mongodb-nar/pom.xml | 7 +-
.../nifi-mongodb-processors/pom.xml | 4 +
.../mongodb/AbstractMongoProcessor.java | 73 ++++++++++++-
.../nifi/processors/mongodb/GetMongo.java | 41 ++++----
.../nifi/processors/mongodb/PutMongo.java | 41 ++++----
.../mongodb/AbstractMongoProcessorTest.java | 104 +++++++++++++++++++
nifi-nar-bundles/nifi-mongodb-bundle/pom.xml | 12 ++-
pom.xml | 2 +-
8 files changed, 235 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
index a17b05f..e714da3 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
@@ -23,7 +23,6 @@
</parent>
<artifactId>nifi-mongodb-nar</artifactId>
- <version>1.0.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
@@ -33,8 +32,12 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-services-api-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-processors</artifactId>
- <version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
index ff67f79..0a26cd6 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
@@ -39,6 +39,10 @@
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
index fae007f..7e3a196 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
@@ -19,16 +19,26 @@
package org.apache.nifi.processors.mongodb;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.net.ssl.SSLContext;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
import org.bson.Document;
import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoClientOptions.Builder;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
@@ -52,6 +62,34 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("ssl-context-service")
+ .displayName("SSL Context Service")
+ .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+ + "connections.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+ public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
+ .name("ssl-client-auth")
+ .displayName("Client Auth")
+ .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+ + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+ + "has been defined and enabled.")
+ .required(false)
+ .allowableValues(SSLContextService.ClientAuth.values())
+ .defaultValue("REQUIRED")
+ .build();
+
+ static List<PropertyDescriptor> descriptors = new ArrayList<>();
+
+ static {
+ descriptors.add(URI);
+ descriptors.add(DATABASE_NAME);
+ descriptors.add(COLLECTION_NAME);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(CLIENT_AUTH);
+ }
protected MongoClient mongoClient;
@@ -63,15 +101,48 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
getLogger().info("Creating MongoClient");
+ // Set up the client for secure (SSL/TLS communications) if configured to do so
+ final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
+ final SSLContext sslContext;
+
+ if (sslService != null) {
+ final SSLContextService.ClientAuth clientAuth;
+ if (StringUtils.isBlank(rawClientAuth)) {
+ clientAuth = SSLContextService.ClientAuth.REQUIRED;
+ } else {
+ try {
+ clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
+ } catch (final IllegalArgumentException iae) {
+ throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
+ rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
+ }
+ }
+ sslContext = sslService.createSSLContext(clientAuth);
+ } else {
+ sslContext = null;
+ }
+
try {
final String uri = context.getProperty(URI).getValue();
- mongoClient = new MongoClient(new MongoClientURI(uri));
+ if(sslContext == null) {
+ mongoClient = new MongoClient(new MongoClientURI(uri));
+ } else {
+ mongoClient = new MongoClient(new MongoClientURI(uri, getClientOptions(sslContext)));
+ }
} catch (Exception e) {
getLogger().error("Failed to schedule PutMongo due to {}", new Object[] { e }, e);
throw e;
}
}
+ protected Builder getClientOptions(final SSLContext sslContext) {
+ MongoClientOptions.Builder builder = MongoClientOptions.builder();
+ builder.sslEnabled(true);
+ builder.socketFactory(sslContext.getSocketFactory());
+ return builder;
+ }
+
@OnStopped
public final void closeClient() {
if (mongoClient != null) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index c2b49d9..ebe7a24 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -100,35 +100,34 @@ public class GetMongo extends AbstractMongoProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
- private final List<PropertyDescriptor> descriptors;
-
- private final Set<Relationship> relationships;
-
- public GetMongo() {
- final List<PropertyDescriptor> descriptors = new ArrayList<>();
- descriptors.add(URI);
- descriptors.add(DATABASE_NAME);
- descriptors.add(COLLECTION_NAME);
- descriptors.add(QUERY);
- descriptors.add(PROJECTION);
- descriptors.add(SORT);
- descriptors.add(LIMIT);
- descriptors.add(BATCH_SIZE);
- this.descriptors = Collections.unmodifiableList(descriptors);
-
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- this.relationships = Collections.unmodifiableSet(relationships);
+ private final static Set<Relationship> relationships;
+ private final static List<PropertyDescriptor> propertyDescriptors;
+
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.addAll(descriptors);
+ _propertyDescriptors.add(QUERY);
+ _propertyDescriptors.add(PROJECTION);
+ _propertyDescriptors.add(SORT);
+ _propertyDescriptors.add(LIMIT);
+ _propertyDescriptors.add(BATCH_SIZE);
+ _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
+ _propertyDescriptors.add(CLIENT_AUTH);
+ propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+ final Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(_relationships);
}
@Override
public Set<Relationship> getRelationships() {
- return this.relationships;
+ return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return descriptors;
+ return propertyDescriptors;
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
index ae4009c..5f6d875 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -106,36 +106,33 @@ public class PutMongo extends AbstractMongoProcessor {
.defaultValue("UTF-8")
.build();
- private final List<PropertyDescriptor> descriptors;
-
- private final Set<Relationship> relationships;
-
- public PutMongo() {
- final List<PropertyDescriptor> descriptors = new ArrayList<>();
- descriptors.add(URI);
- descriptors.add(DATABASE_NAME);
- descriptors.add(COLLECTION_NAME);
- descriptors.add(MODE);
- descriptors.add(UPSERT);
- descriptors.add(UPDATE_QUERY_KEY);
- descriptors.add(WRITE_CONCERN);
- descriptors.add(CHARACTER_SET);
- this.descriptors = Collections.unmodifiableList(descriptors);
-
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
+ private final static Set<Relationship> relationships;
+ private final static List<PropertyDescriptor> propertyDescriptors;
+
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.addAll(descriptors);
+ _propertyDescriptors.add(MODE);
+ _propertyDescriptors.add(UPSERT);
+ _propertyDescriptors.add(UPDATE_QUERY_KEY);
+ _propertyDescriptors.add(WRITE_CONCERN);
+ _propertyDescriptors.add(CHARACTER_SET);
+ propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+ final Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(_relationships);
}
@Override
public Set<Relationship> getRelationships() {
- return this.relationships;
+ return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return descriptors;
+ return propertyDescriptors;
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java
new file mode 100644
index 0000000..1750cc2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.authentication.exception.ProviderCreationException;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoClientOptions.Builder;
+
+public class AbstractMongoProcessorTest {
+
+ MockAbstractMongoProcessor processor;
+ private TestRunner testRunner;
+
+ @Before
+ public void setUp() throws Exception {
+ processor = new MockAbstractMongoProcessor();
+ testRunner = TestRunners.newTestRunner(processor);
+ }
+
+ @Test
+ public void testcreateClientWithSSL() throws Exception {
+ SSLContextService sslService = mock(SSLContextService.class);
+ SSLContext sslContext = mock(SSLContext.class);
+ when(sslService.getIdentifier()).thenReturn("ssl-context");
+ when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext);
+ testRunner.addControllerService("ssl-context", sslService);
+ testRunner.enableControllerService(sslService);
+ testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017");
+ testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
+ testRunner.assertValid(sslService);
+ processor.createClient(testRunner.getProcessContext());
+ assertNotNull(processor.mongoClient);
+ processor.mongoClient = null;
+ testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "WANT");
+ processor.createClient(testRunner.getProcessContext());
+ assertNotNull(processor.mongoClient);
+ }
+
+ @Test(expected = ProviderCreationException.class)
+ public void testcreateClientWithSSLBadClientAuth() throws Exception {
+ SSLContextService sslService = mock(SSLContextService.class);
+ SSLContext sslContext = mock(SSLContext.class);
+ when(sslService.getIdentifier()).thenReturn("ssl-context");
+ when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext);
+ testRunner.addControllerService("ssl-context", sslService);
+ testRunner.enableControllerService(sslService);
+ testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017");
+ testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
+ testRunner.assertValid(sslService);
+ processor.createClient(testRunner.getProcessContext());
+ assertNotNull(processor.mongoClient);
+ processor.mongoClient = null;
+ testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "BAD");
+ processor.createClient(testRunner.getProcessContext());
+ }
+
+
+ /**
+ * Provides a stubbed processor instance for testing
+ */
+ public static class MockAbstractMongoProcessor extends AbstractMongoProcessor {
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ // nothing to do
+ }
+
+ @Override
+ protected Builder getClientOptions(SSLContext sslContext) {
+ return MongoClientOptions.builder();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
index db9e87d..25c6f69 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
@@ -22,9 +22,7 @@
<version>1.0.0-SNAPSHOT</version>
</parent>
- <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-bundle</artifactId>
- <version>1.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
@@ -32,4 +30,14 @@
<module>nifi-mongodb-nar</module>
</modules>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mongodb-processors</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e702785..29b936b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -219,7 +219,7 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
- <version>3.0.2</version>
+ <version>3.2.2</version>
</dependency>
<dependency>
<groupId>com.wordnik</groupId>