You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ze...@apache.org on 2018/09/19 11:34:45 UTC

nifi git commit: NIFI-5239 Made a client service an optional source of connection pooling in Mongo processors.

Repository: nifi
Updated Branches:
  refs/heads/master c56a7e9ba -> 1dea8faa0


NIFI-5239 Made a client service an optional source of connection pooling in Mongo processors.

This closes #2896

Signed-off-by: zenfenan <ze...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1dea8faa
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1dea8faa
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1dea8faa

Branch: refs/heads/master
Commit: 1dea8faa06fc954f6ee7e27082d25b63b65c231e
Parents: c56a7e9
Author: Mike Thomsen <mi...@gmail.com>
Authored: Sun Jul 15 08:07:21 2018 -0400
Committer: zenfenan <si...@gmail.com>
Committed: Wed Sep 19 17:04:25 2018 +0530

----------------------------------------------------------------------
 .../nifi-mongodb-client-service-api/pom.xml     |  11 +
 .../nifi/mongodb/MongoDBClientService.java      |  73 ++++--
 .../nifi-mongodb-processors/pom.xml             |  11 +
 .../mongodb/AbstractMongoProcessor.java         |  77 +++++--
 .../nifi/processors/mongodb/PutMongoRecord.java |   5 +-
 .../nifi/processors/mongodb/DeleteMongoIT.java  |  22 ++
 .../nifi/processors/mongodb/GetMongoIT.java     |  21 +-
 .../nifi/processors/mongodb/PutMongoIT.java     |  22 +-
 .../processors/mongodb/PutMongoRecordIT.java    |  27 ++-
 .../mongodb/RunMongoAggregationIT.java          |  24 ++
 .../AbstractMongoDBControllerService.java       | 228 -------------------
 .../nifi/mongodb/MongoDBControllerService.java  | 217 +++++++++---------
 .../nifi/mongodb/MongoDBLookupService.java      |  40 +++-
 .../mongodb/MongoDBControllerServiceIT.java     | 124 ----------
 .../nifi/mongodb/MongoDBLookupServiceIT.java    |  27 ++-
 nifi-nar-bundles/nifi-mongodb-bundle/pom.xml    |  21 ++
 16 files changed, 427 insertions(+), 523 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/pom.xml
index efd66ab..88f0641 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/pom.xml
@@ -41,5 +41,16 @@
             <artifactId>mongo-java-driver</artifactId>
             <version>${mongo.driver.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-json-utils</artifactId>
+            <version>1.8.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
index 5a3a4b2..2ae9265 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
@@ -17,33 +17,66 @@
 
 package org.apache.nifi.mongodb;
 
+import com.mongodb.WriteConcern;
 import com.mongodb.client.MongoDatabase;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
 import org.bson.Document;
 
-import java.util.List;
-
 public interface MongoDBClientService extends ControllerService {
+     String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED";
+     String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED";
+     String WRITE_CONCERN_FSYNCED = "FSYNCED";
+     String WRITE_CONCERN_JOURNALED = "JOURNALED";
+     String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED";
+     String WRITE_CONCERN_MAJORITY = "MAJORITY";
+
+     PropertyDescriptor URI = new PropertyDescriptor.Builder()
+            .name("mongo-uri")
+            .displayName("Mongo URI")
+            .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.URI_VALIDATOR)
+            .build();
+     PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("ssl-context-service")
+            .displayName("SSL Context Service")
+            .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+                    + "connections.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+     PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
+            .name("ssl-client-auth")
+            .displayName("Client Auth")
+            .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+                    + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+                    + "has been defined and enabled.")
+            .required(false)
+            .allowableValues(SSLContextService.ClientAuth.values())
+            .defaultValue("REQUIRED")
+            .build();
+
+     PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
+            .name("mongo-write-concern")
+            .displayName("Write Concern")
+            .description("The write concern to use")
+            .required(true)
+            .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
+                    WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
+            .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
+            .build();
+
+
     default Document convertJson(String query) {
         return Document.parse(query);
     }
-
-    long count(Document query);
-    void delete(Document query);
-    boolean exists(Document query);
-    Document findOne(Document query);
-    Document findOne(Document query, Document projection);
-    List<Document> findMany(Document query);
-    List<Document> findMany(Document query, int limit);
-    List<Document> findMany(Document query, Document sort, int limit);
-    void insert(Document doc);
-    void insert(List<Document> docs);
-    void update(Document query, Document update);
-    void update(Document query, Document update, boolean multiple);
-    void updateOne(Document query, Document update);
-    void upsert(Document query, Document update);
-    void dropDatabase();
-    void dropCollection();
-
+    WriteConcern getWriteConcern(final ConfigurationContext context);
     MongoDatabase getDatabase(String name);
+    String getURI();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
index c36bf7d..6860340 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
@@ -98,5 +98,16 @@
             <version>1.8.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mongodb-client-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mongodb-services</artifactId>
+            <version>1.8.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
index 507aa42..22f25b5 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
@@ -32,9 +32,11 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.authentication.exception.ProviderCreationException;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.mongodb.MongoDBClientService;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -52,6 +54,8 @@ import java.io.UnsupportedEncodingException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -71,16 +75,24 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
     protected static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON",
             "Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions.");
 
-    protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+        .name("mongo-client-service")
+        .displayName("Client Service")
+        .description("If configured, this property will use the assigned client service for connection pooling.")
+        .required(false)
+        .identifiesControllerService(MongoDBClientService.class)
+        .build();
+
+    static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
         .name("Mongo URI")
         .displayName("Mongo URI")
         .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]")
-        .required(true)
+        .required(false)
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
-    protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
         .name("Mongo Database Name")
         .displayName("Mongo Database Name")
         .description("The name of the database to use")
@@ -89,7 +101,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
-    protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
         .name("Mongo Collection Name")
         .description("The name of the collection to use")
         .required(true)
@@ -199,21 +211,30 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
         .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
-    static List<PropertyDescriptor> descriptors = new ArrayList<>();
+    static final List<PropertyDescriptor> descriptors;
 
     static {
-        descriptors.add(URI);
-        descriptors.add(DATABASE_NAME);
-        descriptors.add(COLLECTION_NAME);
-        descriptors.add(SSL_CONTEXT_SERVICE);
-        descriptors.add(CLIENT_AUTH);
+        List<PropertyDescriptor> _temp = new ArrayList<>();
+        _temp.add(CLIENT_SERVICE);
+        _temp.add(URI);
+        _temp.add(DATABASE_NAME);
+        _temp.add(COLLECTION_NAME);
+        _temp.add(SSL_CONTEXT_SERVICE);
+        _temp.add(CLIENT_AUTH);
+        descriptors = Collections.unmodifiableList(_temp);
     }
 
     protected ObjectMapper objectMapper;
     protected MongoClient mongoClient;
+    protected MongoDBClientService clientService;
 
     @OnScheduled
     public final void createClient(ProcessContext context) throws IOException {
+        if (context.getProperty(CLIENT_SERVICE).isSet()) {
+            clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
+            return;
+        }
+
         if (mongoClient != null) {
             closeClient();
         }
@@ -270,20 +291,10 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
         }
     }
 
-    protected MongoDatabase getDatabase(final ProcessContext context) {
-        return getDatabase(context, null);
-    }
-
     protected MongoDatabase getDatabase(final ProcessContext context, final FlowFile flowFile) {
         final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-        if (StringUtils.isEmpty(databaseName)) {
-            throw new ProcessException("Database name was empty after expression language evaluation.");
-        }
-        return mongoClient.getDatabase(databaseName);
-    }
 
-    protected MongoCollection<Document> getCollection(final ProcessContext context) {
-        return getCollection(context, null);
+        return clientService!= null ? clientService.getDatabase(databaseName) : mongoClient.getDatabase(databaseName);
     }
 
     protected MongoCollection<Document> getCollection(final ProcessContext context, final FlowFile flowFile) {
@@ -295,7 +306,11 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
     }
 
     protected String getURI(final ProcessContext context) {
-        return context.getProperty(URI).evaluateAttributeExpressions().getValue();
+        if (clientService != null) {
+            return clientService.getURI();
+        } else {
+            return context.getProperty(URI).evaluateAttributeExpressions().getValue();
+        }
     }
 
     protected WriteConcern getWriteConcern(final ProcessContext context) {
@@ -346,4 +361,22 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
             objectMapper.setDateFormat(df);
         }
     }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        List<ValidationResult> retVal = new ArrayList<>();
+
+        boolean clientIsSet = context.getProperty(CLIENT_SERVICE).isSet();
+        boolean uriIsSet    = context.getProperty(URI).isSet();
+
+        if (clientIsSet && uriIsSet) {
+            String msg = "The client service and URI fields cannot be set at the same time.";
+            retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        } else if (!clientIsSet && !uriIsSet) {
+            String msg = "The client service or the URI field must be set.";
+            retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+        }
+
+        return retVal;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
index d3698bb..5fbc819 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
@@ -150,7 +150,10 @@ public class PutMongoRecord extends AbstractMongoProcessor {
             error = true;
         } finally {
             if (!error) {
-                session.getProvenanceReporter().send(flowFile, context.getProperty(URI).evaluateAttributeExpressions().getValue(), String.format("Added %d documents to MongoDB.", added));
+                String url = clientService != null
+                        ? clientService.getURI()
+                        : context.getProperty(URI).evaluateAttributeExpressions().getValue();
+                session.getProvenanceReporter().send(flowFile, url, String.format("Added %d documents to MongoDB.", added));
                 session.transfer(flowFile, REL_SUCCESS);
                 getLogger().info("Inserted {} records into MongoDB", new Object[]{ added });
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java
index d10dfc7..9f3c016 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java
@@ -19,6 +19,8 @@
 
 package org.apache.nifi.processors.mongodb;
 
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
 import org.apache.nifi.util.TestRunner;
 import org.bson.Document;
 import org.junit.After;
@@ -119,4 +121,24 @@ public class DeleteMongoIT extends MongoWriteTestBase {
 
         Assert.assertEquals("A document was deleted", 3, collection.count(Document.parse("{}")));
     }
+
+    @Test
+    public void testClientService() throws Exception {
+        MongoDBClientService clientService = new MongoDBControllerService();
+        TestRunner runner = init(DeleteMongo.class);
+        runner.addControllerService("clientService", clientService);
+        runner.removeProperty(DeleteMongo.URI);
+        runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_MANY);
+        runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
+        runner.setProperty(DeleteMongo.CLIENT_SERVICE, "clientService");
+        runner.enableControllerService(clientService);
+        runner.assertValid();
+
+        runner.enqueue("{}");
+        runner.run();
+        runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1);
+        runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0);
+
+        Assert.assertEquals(0, collection.count());
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
index 42045c5..76139d5 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
@@ -26,6 +26,8 @@ import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
@@ -107,9 +109,8 @@ public class GetMongoIT {
         if (pc instanceof MockProcessContext) {
             results = ((MockProcessContext) pc).validate();
         }
-        Assert.assertEquals(3, results.size());
+        Assert.assertEquals(2, results.size());
         Iterator<ValidationResult> it = results.iterator();
-        Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required"));
         Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
         Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
 
@@ -577,6 +578,20 @@ public class GetMongoIT {
         Pattern format = Pattern.compile("([\\d]{4})-([\\d]{2})-([\\d]{2})");
 
         Assert.assertTrue(result.containsKey("date_field"));
-        Assert.assertTrue(format.matcher((String)result.get("date_field")).matches());
+        Assert.assertTrue(format.matcher((String) result.get("date_field")).matches());
+    }
+
+    public void testClientService() throws Exception {
+        MongoDBClientService clientService = new MongoDBControllerService();
+        runner.addControllerService("clientService", clientService);
+        runner.removeProperty(GetMongo.URI);
+        runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
+        runner.setProperty(GetMongo.CLIENT_SERVICE, "clientService");
+        runner.enableControllerService(clientService);
+        runner.assertValid();
+
+        runner.enqueue("{}");
+        runner.run();
+        runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
index 6ceff7b..de08a80 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.mongodb;
 
 import com.mongodb.client.MongoCursor;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
@@ -69,9 +71,8 @@ public class PutMongoIT extends MongoWriteTestBase {
         if (pc instanceof MockProcessContext) {
             results = ((MockProcessContext) pc).validate();
         }
-        Assert.assertEquals(3, results.size());
+        Assert.assertEquals(2, results.size());
         Iterator<ValidationResult> it = results.iterator();
-        Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required"));
         Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
         Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
 
@@ -502,4 +503,21 @@ public class PutMongoIT extends MongoWriteTestBase {
             index++;
         }
     }
+
+    @Test
+    public void testClientService() throws Exception {
+        MongoDBClientService clientService = new MongoDBControllerService();
+        TestRunner runner = init(PutMongo.class);
+        runner.addControllerService("clientService", clientService);
+        runner.removeProperty(PutMongo.URI);
+        runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
+        runner.setProperty(PutMongo.CLIENT_SERVICE, "clientService");
+        runner.enableControllerService(clientService);
+        runner.assertValid();
+
+        runner.enqueue("{ \"msg\": \"Hello, world\" }");
+        runner.run();
+        runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
index 2a24b32..371f38c 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
@@ -21,6 +21,8 @@ import org.apache.avro.Schema;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
@@ -94,9 +96,8 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
         if (pc instanceof MockProcessContext) {
             results = ((MockProcessContext) pc).validate();
         }
-        Assert.assertEquals(4, results.size());
+        Assert.assertEquals(3, results.size());
         Iterator<ValidationResult> it = results.iterator();
-        Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required"));
         Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
         Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
         Assert.assertTrue(it.next().toString().contains("is invalid because Record Reader is required"));
@@ -144,12 +145,30 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, 1);
-        MockFlowFile out = runner.getFlowFilesForRelationship(PutMongoRecord.REL_SUCCESS).get(0);
-
 
         // verify 1 doc inserted into the collection
         assertEquals(5, collection.count());
         //assertEquals(doc, collection.find().first());
+
+
+        runner.clearTransferState();
+
+        /*
+         * Test it with the client service.
+         */
+        MongoDBClientService clientService = new MongoDBControllerService();
+        runner.addControllerService("clientService", clientService);
+        runner.removeProperty(PutMongoRecord.URI);
+        runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
+        runner.setProperty(PutMongoRecord.CLIENT_SERVICE, "clientService");
+        runner.enableControllerService(clientService);
+        runner.assertValid();
+
+        collection.deleteMany(new Document());
+        runner.enqueue("");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, 1);
+        assertEquals(5, collection.count());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
index 02d9ad4..c74692c 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
 import com.mongodb.client.MongoCollection;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -212,4 +214,26 @@ public class RunMongoAggregationIT {
             Assert.assertTrue("Missing $group", queryAttr.contains("$group"));
         }
     }
+
+    @Test
+    public void testClientService() throws Exception {
+        MongoDBClientService clientService = new MongoDBControllerService();
+        runner.addControllerService("clientService", clientService);
+        runner.removeProperty(RunMongoAggregation.URI);
+        runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
+        runner.setProperty(RunMongoAggregation.CLIENT_SERVICE, "clientService");
+        runner.setProperty(RunMongoAggregation.QUERY, "[\n" +
+                        "    {\n" +
+                        "        \"$project\": {\n" +
+                        "            \"_id\": 0,\n" +
+                        "            \"val\": 1\n" +
+                        "        }\n" +
+                        "    }]");
+        runner.enableControllerService(clientService);
+        runner.assertValid();
+
+        runner.enqueue("{}");
+        runner.run();
+        runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 9);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java
deleted file mode 100644
index 5b6c97e..0000000
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.mongodb;
-
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoClientOptions.Builder;
-import com.mongodb.MongoClientURI;
-import com.mongodb.WriteConcern;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.authentication.exception.ProviderCreationException;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.security.util.SslContextFactory;
-import org.apache.nifi.ssl.SSLContextService;
-import org.bson.Document;
-
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class AbstractMongoDBControllerService extends AbstractControllerService {
-    static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED";
-    static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED";
-    static final String WRITE_CONCERN_FSYNCED = "FSYNCED";
-    static final String WRITE_CONCERN_JOURNALED = "JOURNALED";
-    static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED";
-    static final String WRITE_CONCERN_MAJORITY = "MAJORITY";
-
-    protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
-            .name("mongo-uri")
-            .displayName("Mongo URI")
-            .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(Validation.DOCUMENT_VALIDATOR)
-            .build();
-    protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
-            .name("mongo-db-name")
-            .displayName("Mongo Database Name")
-            .description("The name of the database to use")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
-            .name("mongo-collection-name")
-            .displayName("Mongo Collection Name")
-            .description("The name of the collection to use")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-            .name("ssl-context-service")
-            .displayName("SSL Context Service")
-            .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
-                    + "connections.")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
-    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
-            .name("ssl-client-auth")
-            .displayName("Client Auth")
-            .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
-                    + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
-                    + "has been defined and enabled.")
-            .required(false)
-            .allowableValues(SSLContextService.ClientAuth.values())
-            .defaultValue("REQUIRED")
-            .build();
-
-    public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
-            .name("mongo-write-concern")
-            .displayName("Write Concern")
-            .description("The write concern to use")
-            .required(true)
-            .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
-                    WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
-            .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
-            .build();
-
-    static List<PropertyDescriptor> descriptors = new ArrayList<>();
-
-    static {
-        descriptors.add(URI);
-        descriptors.add(DATABASE_NAME);
-        descriptors.add(COLLECTION_NAME);
-        descriptors.add(SSL_CONTEXT_SERVICE);
-        descriptors.add(CLIENT_AUTH);
-    }
-
-    protected MongoClient mongoClient;
-
-    protected final void createClient(ConfigurationContext context) throws IOException {
-        if (mongoClient != null) {
-            closeClient();
-        }
-
-        getLogger().info("Creating MongoClient");
-
-        // Set up the client for secure (SSL/TLS communications) if configured to do so
-        final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
-        final SSLContext sslContext;
-
-        if (sslService != null) {
-            final SSLContextService.ClientAuth clientAuth;
-            if (StringUtils.isBlank(rawClientAuth)) {
-                clientAuth = SSLContextService.ClientAuth.REQUIRED;
-            } else {
-                try {
-                    clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
-                } catch (final IllegalArgumentException iae) {
-                    throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
-                            rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
-                }
-            }
-            sslContext = sslService.createSSLContext(clientAuth);
-        } else {
-            sslContext = null;
-        }
-
-        try {
-            if(sslContext == null) {
-                mongoClient = new MongoClient(new MongoClientURI(getURI(context)));
-            } else {
-                mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
-            }
-        } catch (Exception e) {
-            getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
-            throw e;
-        }
-    }
-
-    protected Builder getClientOptions(final SSLContext sslContext) {
-        MongoClientOptions.Builder builder = MongoClientOptions.builder();
-        builder.sslEnabled(true);
-        builder.socketFactory(sslContext.getSocketFactory());
-        return builder;
-    }
-
-    @OnStopped
-    public final void closeClient() {
-        if (mongoClient != null) {
-            mongoClient.close();
-            mongoClient = null;
-        }
-    }
-
-    protected MongoDatabase getDatabase(final ConfigurationContext context) {
-        return getDatabase(context, null);
-    }
-
-    protected MongoDatabase getDatabase(final ConfigurationContext context, final FlowFile flowFile) {
-        final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-        return mongoClient.getDatabase(databaseName);
-    }
-
-    protected MongoCollection<Document> getCollection(final ConfigurationContext context) {
-        return getCollection(context, null);
-    }
-
-    protected MongoCollection<Document> getCollection(final ConfigurationContext context, final FlowFile flowFile) {
-        final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue();
-        return getDatabase(context, flowFile).getCollection(collectionName);
-    }
-
-    protected String getURI(final ConfigurationContext context) {
-        return context.getProperty(URI).evaluateAttributeExpressions().getValue();
-    }
-
-    protected WriteConcern getWriteConcern(final ConfigurationContext context) {
-        final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue();
-        WriteConcern writeConcern = null;
-        switch (writeConcernProperty) {
-            case WRITE_CONCERN_ACKNOWLEDGED:
-                writeConcern = WriteConcern.ACKNOWLEDGED;
-                break;
-            case WRITE_CONCERN_UNACKNOWLEDGED:
-                writeConcern = WriteConcern.UNACKNOWLEDGED;
-                break;
-            case WRITE_CONCERN_FSYNCED:
-                writeConcern = WriteConcern.FSYNCED;
-                break;
-            case WRITE_CONCERN_JOURNALED:
-                writeConcern = WriteConcern.JOURNALED;
-                break;
-            case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
-                writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
-                break;
-            case WRITE_CONCERN_MAJORITY:
-                writeConcern = WriteConcern.MAJORITY;
-                break;
-            default:
-                writeConcern = WriteConcern.ACKNOWLEDGED;
-        }
-        return writeConcern;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
index a1f9b2b..a851ad6 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
@@ -17,156 +17,159 @@
 
 package org.apache.nifi.mongodb;
 
-import com.mongodb.client.FindIterable;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoClientURI;
+import com.mongodb.WriteConcern;
 import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.UpdateOptions;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.authentication.exception.ProviderCreationException;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.reporting.InitializationException;
-import org.bson.Document;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
 
-import java.io.IOException;
+import javax.net.ssl.SSLContext;
 import java.util.ArrayList;
 import java.util.List;
 
 @Tags({"mongo", "mongodb", "service"})
 @CapabilityDescription(
-    "Provides a controller service that wraps most of the functionality of the MongoDB driver."
+    "Provides a controller service that configures a connection to MongoDB and provides access to that connection to " +
+    "other Mongo-related components."
 )
-public class MongoDBControllerService extends AbstractMongoDBControllerService implements MongoDBClientService {
-    private MongoDatabase db;
-    private MongoCollection<Document> col;
+public class MongoDBControllerService extends AbstractControllerService implements MongoDBClientService {
+    private String uri;
 
     @OnEnabled
-    public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
+    public void onEnabled(final ConfigurationContext context) {
+        this.uri = context.getProperty(URI).evaluateAttributeExpressions().getValue();
         this.createClient(context);
-        this.db = this.mongoClient.getDatabase(context.getProperty(MongoDBControllerService.DATABASE_NAME).getValue());
-        this.col = this.db.getCollection(context.getProperty(MongoDBControllerService.COLLECTION_NAME).getValue());
-    }
-
-    @OnDisabled
-    public void onDisable() {
-        this.mongoClient.close();
-    }
-
-    @Override
-    public long count(Document query) {
-        return this.col.count(query);
-    }
-
-    @Override
-    public void delete(Document query) {
-        this.col.deleteMany(query);
     }
 
-    @Override
-    public boolean exists(Document query) {
-        return this.col.count(query) > 0;
-    }
-
-    @Override
-    public Document findOne(Document query) {
-        MongoCursor<Document> cursor  = this.col.find(query).limit(1).iterator();
-        Document retVal = cursor.tryNext();
-        cursor.close();
-
-        return retVal;
-    }
+    static List<PropertyDescriptor> descriptors = new ArrayList<>();
 
-    @Override
-    public Document findOne(Document query, Document projection) {
-        MongoCursor<Document> cursor  = projection != null
-                ? this.col.find(query).projection(projection).limit(1).iterator()
-                : this.col.find(query).limit(1).iterator();
-        Document retVal = cursor.tryNext();
-        cursor.close();
-
-        return retVal;
-    }
-
-    @Override
-    public List<Document> findMany(Document query) {
-        return findMany(query, null, -1);
+    static {
+        descriptors.add(URI);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(CLIENT_AUTH);
     }
 
-    @Override
-    public List<Document> findMany(Document query, int limit) {
-        return findMany(query, null, limit);
-    }
+    protected MongoClient mongoClient;
 
-    @Override
-    public List<Document> findMany(Document query, Document sort, int limit) {
-        FindIterable<Document> fi = this.col.find(query);
-        if (limit > 0) {
-            fi = fi.limit(limit);
-        }
-        if (sort != null) {
-            fi = fi.sort(sort);
-        }
-        MongoCursor<Document> cursor = fi.iterator();
-        List<Document> retVal = new ArrayList<>();
-        while (cursor.hasNext()) {
-            retVal.add(cursor.next());
+    protected final void createClient(ConfigurationContext context) {
+        if (mongoClient != null) {
+            closeClient();
         }
-        cursor.close();
 
-        return retVal;
-    }
+        getLogger().info("Creating MongoClient");
+
+        // Set up the client for secure (SSL/TLS communications) if configured to do so
+        final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
+        final SSLContext sslContext;
+
+        if (sslService != null) {
+            final SSLContextService.ClientAuth clientAuth;
+            if (StringUtils.isBlank(rawClientAuth)) {
+                clientAuth = SSLContextService.ClientAuth.REQUIRED;
+            } else {
+                try {
+                    clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
+                } catch (final IllegalArgumentException iae) {
+                    throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
+                            rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
+                }
+            }
+            sslContext = sslService.createSSLContext(clientAuth);
+        } else {
+            sslContext = null;
+        }
 
-    @Override
-    public void insert(Document doc) {
-        this.col.insertOne(doc);
+        try {
+            if(sslContext == null) {
+                mongoClient = new MongoClient(new MongoClientURI(getURI(context)));
+            } else {
+                mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
+            }
+        } catch (Exception e) {
+            getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
+            throw e;
+        }
     }
 
-    @Override
-    public void insert(List<Document> docs) {
-        this.col.insertMany(docs);
+    protected MongoClientOptions.Builder getClientOptions(final SSLContext sslContext) {
+        MongoClientOptions.Builder builder = MongoClientOptions.builder();
+        builder.sslEnabled(true);
+        builder.socketFactory(sslContext.getSocketFactory());
+        return builder;
     }
 
-    @Override
-    public void update(Document query, Document update, boolean multiple) {
-        if (multiple) {
-            this.col.updateMany(query, update);
-        } else {
-            this.col.updateOne(query, update);
+    @OnStopped
+    public final void closeClient() {
+        if (mongoClient != null) {
+            mongoClient.close();
+            mongoClient = null;
         }
     }
 
-    @Override
-    public void update(Document query, Document update) {
-        update(query, update, true);
+    protected String getURI(final ConfigurationContext context) {
+        return context.getProperty(URI).evaluateAttributeExpressions().getValue();
+    }
+
+    @Override
+    public WriteConcern getWriteConcern(final ConfigurationContext context) {
+        final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue();
+        WriteConcern writeConcern = null;
+        switch (writeConcernProperty) {
+            case WRITE_CONCERN_ACKNOWLEDGED:
+                writeConcern = WriteConcern.ACKNOWLEDGED;
+                break;
+            case WRITE_CONCERN_UNACKNOWLEDGED:
+                writeConcern = WriteConcern.UNACKNOWLEDGED;
+                break;
+            case WRITE_CONCERN_FSYNCED:
+                writeConcern = WriteConcern.FSYNCED;
+                break;
+            case WRITE_CONCERN_JOURNALED:
+                writeConcern = WriteConcern.JOURNALED;
+                break;
+            case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
+                writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
+                break;
+            case WRITE_CONCERN_MAJORITY:
+                writeConcern = WriteConcern.MAJORITY;
+                break;
+            default:
+                writeConcern = WriteConcern.ACKNOWLEDGED;
+        }
+        return writeConcern;
     }
 
     @Override
-    public void updateOne(Document query, Document update) {
-        this.update(query, update, false);
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
     }
 
-    @Override
-    public void upsert(Document query, Document update) {
-        this.col.updateOne(query, update, new UpdateOptions().upsert(true));
+    @OnDisabled
+    public void onDisable() {
+        this.mongoClient.close();
     }
 
-    @Override
-    public void dropDatabase() {
-        this.db.drop();
-        this.col = null;
-    }
 
     @Override
-    public void dropCollection() {
-        this.col.drop();
-        this.col = null;
+    public MongoDatabase getDatabase(String name) {
+        return mongoClient.getDatabase(name);
     }
 
     @Override
-    public MongoDatabase getDatabase(String name) {
-        return mongoClient.getDatabase(name);
+    public String getURI() {
+        return uri;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
index cd1829a..be8aa89 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
@@ -17,6 +17,8 @@
 
 package org.apache.nifi.mongodb;
 
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -24,9 +26,11 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.lookup.LookupFailureException;
 import org.apache.nifi.lookup.LookupService;
 import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
 import org.apache.nifi.serialization.record.MapRecord;
@@ -63,6 +67,9 @@ import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
     "then the entire MongoDB result document minus the _id field will be returned as a record."
 )
 public class MongoDBLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Object> {
+    private volatile String databaseName;
+    private volatile String collection;
+
     public static final PropertyDescriptor CONTROLLER_SERVICE = new PropertyDescriptor.Builder()
         .name("mongo-lookup-client-service")
         .displayName("Client Service")
@@ -70,7 +77,22 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp
         .required(true)
         .identifiesControllerService(MongoDBClientService.class)
         .build();
-
+    public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
+        .name("mongo-db-name")
+        .displayName("Mongo Database Name")
+        .description("The name of the database to use")
+        .required(true)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+    public static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
+        .name("mongo-collection-name")
+        .displayName("Mongo Collection Name")
+        .description("The name of the collection to use")
+        .required(true)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
     public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new PropertyDescriptor.Builder()
         .name("mongo-lookup-value-field")
         .displayName("Lookup Value Field")
@@ -113,7 +135,7 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp
         }
 
         try {
-            Document result = projection != null ? controllerService.findOne(query, projection) : controllerService.findOne(query);
+            Document result = findOne(query, projection);
 
             if(result == null) {
                 return Optional.empty();
@@ -149,6 +171,9 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp
 
         this.schemaNameProperty = context.getProperty(SchemaAccessUtils.SCHEMA_NAME).getValue();
 
+        this.databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions().getValue();
+        this.collection   = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue();
+
         String configuredProjection = context.getProperty(PROJECTION).isSet()
             ? context.getProperty(PROJECTION).getValue()
             : null;
@@ -187,9 +212,20 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp
         _temp.add(SCHEMA_BRANCH_NAME);
         _temp.add(SCHEMA_TEXT);
         _temp.add(CONTROLLER_SERVICE);
+        _temp.add(DATABASE_NAME);
+        _temp.add(COLLECTION_NAME);
         _temp.add(LOOKUP_VALUE_FIELD);
         _temp.add(PROJECTION);
 
         return Collections.unmodifiableList(_temp);
     }
+
+    private Document findOne(Document query, Document projection) {
+        MongoCollection col = controllerService.getDatabase(databaseName).getCollection(collection);
+        MongoCursor<Document> it = (projection != null ? col.find(query).projection(projection) : col.find(query)).iterator();
+        Document retVal = it.hasNext() ? it.next() : null;
+        it.close();
+
+        return retVal;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
index 33a173c..cd306f9 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
@@ -17,22 +17,13 @@
 
 package org.apache.nifi.mongodb;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.bson.Document;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
 
 public class MongoDBControllerServiceIT {
     private static final String DB_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis());
@@ -46,15 +37,12 @@ public class MongoDBControllerServiceIT {
         runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
         service = new MongoDBControllerService();
         runner.addControllerService("Client Service", service);
-        runner.setProperty(service, MongoDBControllerService.DATABASE_NAME, DB_NAME);
-        runner.setProperty(service, MongoDBControllerService.COLLECTION_NAME, COL_NAME);
         runner.setProperty(service, MongoDBControllerService.URI, "mongodb://localhost:27017");
         runner.enableControllerService(service);
     }
 
     @After
     public void after() throws Exception {
-        service.dropDatabase();
         service.onDisable();
     }
 
@@ -62,116 +50,4 @@ public class MongoDBControllerServiceIT {
     public void testInit() throws Exception {
         runner.assertValid(service);
     }
-
-    @Test
-    public void testBasicCRUD() throws Exception {
-        Document doc = service.convertJson("{\n" +
-                "\t\"uuid\": \"x-y-z\",\n" +
-                "\t\"message\": \"Testing!\"\n" +
-                "}");
-        Document lookup = service.convertJson("{ \"uuid\": \"x-y-z\" }");
-        Document update = service.convertJson("{\n" +
-                "\t\"$set\": {\n" +
-                "\t\t\"updatedBy\": \"testUser\"\n" +
-                "\t}\n" +
-                "}");
-
-        service.insert(doc);
-        Document result = service.findOne(lookup);
-
-        Assert.assertNotNull("The result was null", result);
-        Assert.assertEquals("The UUID did not match", result.getString("uuid"), "x-y-z");
-        Assert.assertNotNull("The message block was missing", result.getString("message"));
-        Assert.assertEquals("The message block did not match", result.getString("message"), "Testing!");
-
-        service.update(lookup, update, false);
-
-        result = service.findOne(lookup);
-
-        Assert.assertNotNull("The result was null", result);
-        Assert.assertEquals("The UUID did not match", result.getString("uuid"), "x-y-z");
-        Assert.assertNotNull("The message block was missing", result.getString("message"));
-        Assert.assertEquals("The message block did not match", result.getString("message"), "Testing!");
-        Assert.assertNotNull("The updatedBy block was missing", result.getString("updatedBy"));
-        Assert.assertEquals("The updatedBy block did not match", result.getString("updatedBy"), "testUser");
-
-        service.delete(lookup);
-
-        boolean exists = service.exists(lookup);
-
-        Assert.assertFalse("After the delete, the document still existed", exists);
-    }
-
-    @Test
-    public void testMultipleCRUD() throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        List<Document> sampleDocuments = new ArrayList<>();
-        List<String> uuids = new ArrayList<>();
-        Map<String, Object> mappings = new HashMap<>();
-        Random random = new Random();
-        int count = random.nextInt(1000);
-        for (int x = 0; x < count; x++) {
-            Map<String, Object> doc = new HashMap<>();
-            String uuid = UUID.randomUUID().toString();
-            String ts = Calendar.getInstance().getTime().toString();
-            uuids.add(uuid);
-            mappings.put(uuid, ts);
-
-            doc.put("uuid", uuid);
-            doc.put("timestamp", ts);
-            doc.put("randomNumber", random.nextInt(10));
-
-            String json = mapper.writeValueAsString(doc);
-            sampleDocuments.add(service.convertJson(json));
-        }
-
-        service.insert(sampleDocuments);
-
-        long docCount = service.count(service.convertJson("{}"));
-
-        Assert.assertEquals("The counts did not match", docCount, count);
-        for (String uuid : uuids) {
-            Document lookup = service.convertJson(String.format("{ \"uuid\": \"%s\" }", uuid));
-            Document result = service.findOne(lookup);
-            Assert.assertNotNull("The document was not found", result);
-            Assert.assertEquals("The uuid did not match", result.getString("uuid"), uuid);
-            Assert.assertEquals("The timestamp did not match", result.getString("timestamp"), mappings.get(uuid));
-        }
-
-        Document query = service.convertJson("{ \"randomNumber\": 5 }");
-        docCount = service.count(query);
-        List<Document> results = service.findMany(query);
-
-        Assert.assertTrue("Count should have been >= 1", docCount >= 1);
-        Assert.assertNotNull("Result set was null", results);
-        Assert.assertEquals("The counts did not match up", docCount, results.size());
-    }
-
-    @Test
-    public void testUpsert() throws Exception {
-        Document query = service.convertJson(String.format("{ \"uuid\": \"%s\" }", UUID.randomUUID().toString()));
-        Document update = service.convertJson("{ \"$set\": { \"message\": \"Hello, world\"  } }");
-        service.upsert(query, update);
-
-        Document result = service.findOne(query);
-        Assert.assertNotNull("No result returned", result);
-        Assert.assertEquals("UUID did not match", result.getString("uuid"), query.getString("uuid"));
-        Assert.assertEquals("Message did not match", result.getString("message"), "Hello, world");
-
-        Map<String, String> mappings = new HashMap<>();
-        for (int x = 0; x < 5; x++) {
-            String fieldName = String.format("field_%d", x);
-            String uuid = UUID.randomUUID().toString();
-            mappings.put(fieldName, uuid);
-            update = service.convertJson(String.format("{ \"$set\": { \"%s\": \"%s\" } }", fieldName, uuid));
-
-            service.upsert(query, update);
-        }
-
-        result = service.findOne(query);
-
-        for (Map.Entry<String, String> entry : mappings.entrySet()) {
-            Assert.assertEquals("Entry did not match.", entry.getValue(), result.getString(entry.getKey()));
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
index 2c7f522..c3ae905 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
@@ -17,6 +17,8 @@
 
 package org.apache.nifi.mongodb;
 
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.lookup.LookupFailureException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
@@ -49,6 +51,8 @@ public class MongoDBLookupServiceIT {
     private TestRunner runner;
     private MongoDBLookupService service;
     private MongoDBControllerService controllerService;
+    private MongoDatabase db;
+    private MongoCollection col;
 
     @Before
     public void before() throws Exception {
@@ -58,8 +62,8 @@ public class MongoDBLookupServiceIT {
         runner.addControllerService("Client Service", service);
         runner.addControllerService("Client Service 2", controllerService);
         runner.setProperty(TestLookupServiceProcessor.CLIENT_SERVICE, "Client Service");
-        runner.setProperty(controllerService, MongoDBControllerService.DATABASE_NAME, DB_NAME);
-        runner.setProperty(controllerService, MongoDBControllerService.COLLECTION_NAME, COL_NAME);
+        runner.setProperty(service, MongoDBLookupService.DATABASE_NAME, DB_NAME);
+        runner.setProperty(service, MongoDBLookupService.COLLECTION_NAME, COL_NAME);
         runner.setProperty(controllerService, MongoDBControllerService.URI, "mongodb://localhost:27017");
         runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
         runner.setProperty(service, MongoDBLookupService.CONTROLLER_SERVICE, "Client Service 2");
@@ -70,11 +74,14 @@ public class MongoDBLookupServiceIT {
         runner.enableControllerService(registry);
         runner.enableControllerService(controllerService);
         runner.enableControllerService(service);
+
+        db = controllerService.getDatabase(DB_NAME);
+        col = db.getCollection(COL_NAME);
     }
 
     @After
     public void after() {
-        controllerService.dropDatabase();
+        db.drop();
         controllerService.onDisable();
     }
 
@@ -90,7 +97,7 @@ public class MongoDBLookupServiceIT {
         runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
         runner.enableControllerService(service);
         Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
-        controllerService.insert(document);
+        col.insertOne(document);
 
         Map<String, Object> criteria = new HashMap<>();
         criteria.put("uuid", "x-y-z");
@@ -101,7 +108,7 @@ public class MongoDBLookupServiceIT {
 
         Map<String, Object> clean = new HashMap<>();
         clean.putAll(criteria);
-        controllerService.delete(new Document(clean));
+        col.deleteOne(new Document(clean));
 
         try {
             result = service.lookup(criteria);
@@ -116,7 +123,7 @@ public class MongoDBLookupServiceIT {
     public void testWithSchemaRegistry() throws Exception {
         runner.assertValid();
 
-        controllerService.insert(new Document()
+        col.insertOne(new Document()
             .append("username", "john.smith")
             .append("password", "testing1234")
         );
@@ -157,7 +164,7 @@ public class MongoDBLookupServiceIT {
         runner.enableControllerService(service);
         runner.assertValid();
 
-        controllerService.insert(new Document().append("msg", "Testing1234"));
+        col.insertOne(new Document().append("msg", "Testing1234"));
 
         Map<String, Object> criteria = new HashMap<>();
         criteria.put("msg", "Testing1234");
@@ -180,7 +187,7 @@ public class MongoDBLookupServiceIT {
         Timestamp ts = new Timestamp(new Date().getTime());
         List list = Arrays.asList("a", "b", "c", "d", "e");
 
-        controllerService.insert(new Document()
+        col.insertOne(new Document()
             .append("uuid", "x-y-z")
             .append("dateField", d)
             .append("longField", 10000L)
@@ -217,7 +224,7 @@ public class MongoDBLookupServiceIT {
 
         Map<String, Object> clean = new HashMap<>();
         clean.putAll(criteria);
-        controllerService.delete(new Document(clean));
+        col.deleteOne(new Document(clean));
 
         try {
             result = service.lookup(criteria);
@@ -231,7 +238,7 @@ public class MongoDBLookupServiceIT {
     @Test
     public void testServiceParameters() {
         Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
-        controllerService.insert(document);
+        col.insertOne(document);
 
         Map<String, Object> criteria = new HashMap<>();
         criteria.put("uuid", "x-y-z");

http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
index fef2f92..eecb467 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
@@ -38,6 +38,27 @@
         <mongo.driver.version>3.2.2</mongo.driver.version>
     </properties>
 
+    <profiles>
+        <profile>
+            <id>3.4</id>
+            <properties>
+                <mongo.driver.version>3.4.3</mongo.driver.version>
+            </properties>
+        </profile>
+        <profile>
+            <id>3.6</id>
+            <properties>
+                <mongo.driver.version>3.6.4</mongo.driver.version>
+            </properties>
+        </profile>
+        <profile>
+            <id>3.8</id>
+            <properties>
+                <mongo.driver.version>3.8.0</mongo.driver.version>
+            </properties>
+        </profile>
+    </profiles>
+
     <dependencyManagement>
         <dependencies>
             <dependency>