You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ze...@apache.org on 2018/09/19 11:34:45 UTC
nifi git commit: NIFI-5239 Made a client service an optional source
of connection pooling in Mongo processors.
Repository: nifi
Updated Branches:
refs/heads/master c56a7e9ba -> 1dea8faa0
NIFI-5239 Made a client service an optional source of connection pooling in Mongo processors.
This closes #2896
Signed-off-by: zenfenan <ze...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1dea8faa
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1dea8faa
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1dea8faa
Branch: refs/heads/master
Commit: 1dea8faa06fc954f6ee7e27082d25b63b65c231e
Parents: c56a7e9
Author: Mike Thomsen <mi...@gmail.com>
Authored: Sun Jul 15 08:07:21 2018 -0400
Committer: zenfenan <si...@gmail.com>
Committed: Wed Sep 19 17:04:25 2018 +0530
----------------------------------------------------------------------
.../nifi-mongodb-client-service-api/pom.xml | 11 +
.../nifi/mongodb/MongoDBClientService.java | 73 ++++--
.../nifi-mongodb-processors/pom.xml | 11 +
.../mongodb/AbstractMongoProcessor.java | 77 +++++--
.../nifi/processors/mongodb/PutMongoRecord.java | 5 +-
.../nifi/processors/mongodb/DeleteMongoIT.java | 22 ++
.../nifi/processors/mongodb/GetMongoIT.java | 21 +-
.../nifi/processors/mongodb/PutMongoIT.java | 22 +-
.../processors/mongodb/PutMongoRecordIT.java | 27 ++-
.../mongodb/RunMongoAggregationIT.java | 24 ++
.../AbstractMongoDBControllerService.java | 228 -------------------
.../nifi/mongodb/MongoDBControllerService.java | 217 +++++++++---------
.../nifi/mongodb/MongoDBLookupService.java | 40 +++-
.../mongodb/MongoDBControllerServiceIT.java | 124 ----------
.../nifi/mongodb/MongoDBLookupServiceIT.java | 27 ++-
nifi-nar-bundles/nifi-mongodb-bundle/pom.xml | 21 ++
16 files changed, 427 insertions(+), 523 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/pom.xml
index efd66ab..88f0641 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/pom.xml
@@ -41,5 +41,16 @@
<artifactId>mongo-java-driver</artifactId>
<version>${mongo.driver.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-json-utils</artifactId>
+ <version>1.8.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
index 5a3a4b2..2ae9265 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
@@ -17,33 +17,66 @@
package org.apache.nifi.mongodb;
+import com.mongodb.WriteConcern;
import com.mongodb.client.MongoDatabase;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
import org.bson.Document;
-import java.util.List;
-
public interface MongoDBClientService extends ControllerService {
+ String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED";
+ String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED";
+ String WRITE_CONCERN_FSYNCED = "FSYNCED";
+ String WRITE_CONCERN_JOURNALED = "JOURNALED";
+ String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED";
+ String WRITE_CONCERN_MAJORITY = "MAJORITY";
+
+ PropertyDescriptor URI = new PropertyDescriptor.Builder()
+ .name("mongo-uri")
+ .displayName("Mongo URI")
+ .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.URI_VALIDATOR)
+ .build();
+ PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("ssl-context-service")
+ .displayName("SSL Context Service")
+ .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+ + "connections.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+ PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
+ .name("ssl-client-auth")
+ .displayName("Client Auth")
+ .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+ + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+ + "has been defined and enabled.")
+ .required(false)
+ .allowableValues(SSLContextService.ClientAuth.values())
+ .defaultValue("REQUIRED")
+ .build();
+
+ PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
+ .name("mongo-write-concern")
+ .displayName("Write Concern")
+ .description("The write concern to use")
+ .required(true)
+ .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
+ WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
+ .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
+ .build();
+
+
default Document convertJson(String query) {
return Document.parse(query);
}
-
- long count(Document query);
- void delete(Document query);
- boolean exists(Document query);
- Document findOne(Document query);
- Document findOne(Document query, Document projection);
- List<Document> findMany(Document query);
- List<Document> findMany(Document query, int limit);
- List<Document> findMany(Document query, Document sort, int limit);
- void insert(Document doc);
- void insert(List<Document> docs);
- void update(Document query, Document update);
- void update(Document query, Document update, boolean multiple);
- void updateOne(Document query, Document update);
- void upsert(Document query, Document update);
- void dropDatabase();
- void dropCollection();
-
+ WriteConcern getWriteConcern(final ConfigurationContext context);
MongoDatabase getDatabase(String name);
+ String getURI();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
index c36bf7d..6860340 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
@@ -98,5 +98,16 @@
<version>1.8.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mongodb-client-service-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mongodb-services</artifactId>
+ <version>1.8.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
index 507aa42..22f25b5 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
@@ -32,9 +32,11 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -52,6 +54,8 @@ import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -71,16 +75,24 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
protected static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON",
"Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions.");
- protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+ .name("mongo-client-service")
+ .displayName("Client Service")
+ .description("If configured, this property will use the assigned client service for connection pooling.")
+ .required(false)
+ .identifiesControllerService(MongoDBClientService.class)
+ .build();
+
+ static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
.name("Mongo URI")
.displayName("Mongo URI")
.description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]")
- .required(true)
+ .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
.name("Mongo Database Name")
.displayName("Mongo Database Name")
.description("The name of the database to use")
@@ -89,7 +101,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
.name("Mongo Collection Name")
.description("The name of the collection to use")
.required(true)
@@ -199,21 +211,30 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- static List<PropertyDescriptor> descriptors = new ArrayList<>();
+ static final List<PropertyDescriptor> descriptors;
static {
- descriptors.add(URI);
- descriptors.add(DATABASE_NAME);
- descriptors.add(COLLECTION_NAME);
- descriptors.add(SSL_CONTEXT_SERVICE);
- descriptors.add(CLIENT_AUTH);
+ List<PropertyDescriptor> _temp = new ArrayList<>();
+ _temp.add(CLIENT_SERVICE);
+ _temp.add(URI);
+ _temp.add(DATABASE_NAME);
+ _temp.add(COLLECTION_NAME);
+ _temp.add(SSL_CONTEXT_SERVICE);
+ _temp.add(CLIENT_AUTH);
+ descriptors = Collections.unmodifiableList(_temp);
}
protected ObjectMapper objectMapper;
protected MongoClient mongoClient;
+ protected MongoDBClientService clientService;
@OnScheduled
public final void createClient(ProcessContext context) throws IOException {
+ if (context.getProperty(CLIENT_SERVICE).isSet()) {
+ clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
+ return;
+ }
+
if (mongoClient != null) {
closeClient();
}
@@ -270,20 +291,10 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
}
}
- protected MongoDatabase getDatabase(final ProcessContext context) {
- return getDatabase(context, null);
- }
-
protected MongoDatabase getDatabase(final ProcessContext context, final FlowFile flowFile) {
final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
- if (StringUtils.isEmpty(databaseName)) {
- throw new ProcessException("Database name was empty after expression language evaluation.");
- }
- return mongoClient.getDatabase(databaseName);
- }
- protected MongoCollection<Document> getCollection(final ProcessContext context) {
- return getCollection(context, null);
+ return clientService!= null ? clientService.getDatabase(databaseName) : mongoClient.getDatabase(databaseName);
}
protected MongoCollection<Document> getCollection(final ProcessContext context, final FlowFile flowFile) {
@@ -295,7 +306,11 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
}
protected String getURI(final ProcessContext context) {
- return context.getProperty(URI).evaluateAttributeExpressions().getValue();
+ if (clientService != null) {
+ return clientService.getURI();
+ } else {
+ return context.getProperty(URI).evaluateAttributeExpressions().getValue();
+ }
}
protected WriteConcern getWriteConcern(final ProcessContext context) {
@@ -346,4 +361,22 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
objectMapper.setDateFormat(df);
}
}
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext context) {
+ List<ValidationResult> retVal = new ArrayList<>();
+
+ boolean clientIsSet = context.getProperty(CLIENT_SERVICE).isSet();
+ boolean uriIsSet = context.getProperty(URI).isSet();
+
+ if (clientIsSet && uriIsSet) {
+ String msg = "The client service and URI fields cannot be set at the same time.";
+ retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+ } else if (!clientIsSet && !uriIsSet) {
+ String msg = "The client service or the URI field must be set.";
+ retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
+ }
+
+ return retVal;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
index d3698bb..5fbc819 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
@@ -150,7 +150,10 @@ public class PutMongoRecord extends AbstractMongoProcessor {
error = true;
} finally {
if (!error) {
- session.getProvenanceReporter().send(flowFile, context.getProperty(URI).evaluateAttributeExpressions().getValue(), String.format("Added %d documents to MongoDB.", added));
+ String url = clientService != null
+ ? clientService.getURI()
+ : context.getProperty(URI).evaluateAttributeExpressions().getValue();
+ session.getProvenanceReporter().send(flowFile, url, String.format("Added %d documents to MongoDB.", added));
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Inserted {} records into MongoDB", new Object[]{ added });
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java
index d10dfc7..9f3c016 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/DeleteMongoIT.java
@@ -19,6 +19,8 @@
package org.apache.nifi.processors.mongodb;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.util.TestRunner;
import org.bson.Document;
import org.junit.After;
@@ -119,4 +121,24 @@ public class DeleteMongoIT extends MongoWriteTestBase {
Assert.assertEquals("A document was deleted", 3, collection.count(Document.parse("{}")));
}
+
+ @Test
+ public void testClientService() throws Exception {
+ MongoDBClientService clientService = new MongoDBControllerService();
+ TestRunner runner = init(DeleteMongo.class);
+ runner.addControllerService("clientService", clientService);
+ runner.removeProperty(DeleteMongo.URI);
+ runner.setProperty(DeleteMongo.DELETE_MODE, DeleteMongo.DELETE_MANY);
+ runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
+ runner.setProperty(DeleteMongo.CLIENT_SERVICE, "clientService");
+ runner.enableControllerService(clientService);
+ runner.assertValid();
+
+ runner.enqueue("{}");
+ runner.run();
+ runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1);
+ runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0);
+
+ Assert.assertEquals(0, collection.count());
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
index 42045c5..76139d5 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
@@ -26,6 +26,8 @@ import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
@@ -107,9 +109,8 @@ public class GetMongoIT {
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
- Assert.assertEquals(3, results.size());
+ Assert.assertEquals(2, results.size());
Iterator<ValidationResult> it = results.iterator();
- Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
@@ -577,6 +578,20 @@ public class GetMongoIT {
Pattern format = Pattern.compile("([\\d]{4})-([\\d]{2})-([\\d]{2})");
Assert.assertTrue(result.containsKey("date_field"));
- Assert.assertTrue(format.matcher((String)result.get("date_field")).matches());
+ Assert.assertTrue(format.matcher((String) result.get("date_field")).matches());
+ }
+
+ public void testClientService() throws Exception {
+ MongoDBClientService clientService = new MongoDBControllerService();
+ runner.addControllerService("clientService", clientService);
+ runner.removeProperty(GetMongo.URI);
+ runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
+ runner.setProperty(GetMongo.CLIENT_SERVICE, "clientService");
+ runner.enableControllerService(clientService);
+ runner.assertValid();
+
+ runner.enqueue("{}");
+ runner.run();
+ runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
index 6ceff7b..de08a80 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.mongodb;
import com.mongodb.client.MongoCursor;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
@@ -69,9 +71,8 @@ public class PutMongoIT extends MongoWriteTestBase {
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
- Assert.assertEquals(3, results.size());
+ Assert.assertEquals(2, results.size());
Iterator<ValidationResult> it = results.iterator();
- Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
@@ -502,4 +503,21 @@ public class PutMongoIT extends MongoWriteTestBase {
index++;
}
}
+
+ @Test
+ public void testClientService() throws Exception {
+ MongoDBClientService clientService = new MongoDBControllerService();
+ TestRunner runner = init(PutMongo.class);
+ runner.addControllerService("clientService", clientService);
+ runner.removeProperty(PutMongo.URI);
+ runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
+ runner.setProperty(PutMongo.CLIENT_SERVICE, "clientService");
+ runner.enableControllerService(clientService);
+ runner.assertValid();
+
+ runner.enqueue("{ \"msg\": \"Hello, world\" }");
+ runner.run();
+ runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
+ runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
index 2a24b32..371f38c 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
@@ -21,6 +21,8 @@ import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
@@ -94,9 +96,8 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
- Assert.assertEquals(4, results.size());
+ Assert.assertEquals(3, results.size());
Iterator<ValidationResult> it = results.iterator();
- Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
Assert.assertTrue(it.next().toString().contains("is invalid because Record Reader is required"));
@@ -144,12 +145,30 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
runner.run();
runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, 1);
- MockFlowFile out = runner.getFlowFilesForRelationship(PutMongoRecord.REL_SUCCESS).get(0);
-
// verify 1 doc inserted into the collection
assertEquals(5, collection.count());
//assertEquals(doc, collection.find().first());
+
+
+ runner.clearTransferState();
+
+ /*
+ * Test it with the client service.
+ */
+ MongoDBClientService clientService = new MongoDBControllerService();
+ runner.addControllerService("clientService", clientService);
+ runner.removeProperty(PutMongoRecord.URI);
+ runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
+ runner.setProperty(PutMongoRecord.CLIENT_SERVICE, "clientService");
+ runner.enableControllerService(clientService);
+ runner.assertValid();
+
+ collection.deleteMany(new Document());
+ runner.enqueue("");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, 1);
+ assertEquals(5, collection.count());
}
@Test
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
index 02d9ad4..c74692c 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -212,4 +214,26 @@ public class RunMongoAggregationIT {
Assert.assertTrue("Missing $group", queryAttr.contains("$group"));
}
}
+
+ @Test
+ public void testClientService() throws Exception {
+ MongoDBClientService clientService = new MongoDBControllerService();
+ runner.addControllerService("clientService", clientService);
+ runner.removeProperty(RunMongoAggregation.URI);
+ runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
+ runner.setProperty(RunMongoAggregation.CLIENT_SERVICE, "clientService");
+ runner.setProperty(RunMongoAggregation.QUERY, "[\n" +
+ " {\n" +
+ " \"$project\": {\n" +
+ " \"_id\": 0,\n" +
+ " \"val\": 1\n" +
+ " }\n" +
+ " }]");
+ runner.enableControllerService(clientService);
+ runner.assertValid();
+
+ runner.enqueue("{}");
+ runner.run();
+ runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 9);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java
deleted file mode 100644
index 5b6c97e..0000000
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.mongodb;
-
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientOptions;
-import com.mongodb.MongoClientOptions.Builder;
-import com.mongodb.MongoClientURI;
-import com.mongodb.WriteConcern;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.authentication.exception.ProviderCreationException;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.security.util.SslContextFactory;
-import org.apache.nifi.ssl.SSLContextService;
-import org.bson.Document;
-
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class AbstractMongoDBControllerService extends AbstractControllerService {
- static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED";
- static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED";
- static final String WRITE_CONCERN_FSYNCED = "FSYNCED";
- static final String WRITE_CONCERN_JOURNALED = "JOURNALED";
- static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED";
- static final String WRITE_CONCERN_MAJORITY = "MAJORITY";
-
- protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
- .name("mongo-uri")
- .displayName("Mongo URI")
- .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(Validation.DOCUMENT_VALIDATOR)
- .build();
- protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
- .name("mongo-db-name")
- .displayName("Mongo Database Name")
- .description("The name of the database to use")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
- .name("mongo-collection-name")
- .displayName("Mongo Collection Name")
- .description("The name of the collection to use")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("ssl-context-service")
- .displayName("SSL Context Service")
- .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
- + "connections.")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
- public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
- .name("ssl-client-auth")
- .displayName("Client Auth")
- .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
- + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
- + "has been defined and enabled.")
- .required(false)
- .allowableValues(SSLContextService.ClientAuth.values())
- .defaultValue("REQUIRED")
- .build();
-
- public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
- .name("mongo-write-concern")
- .displayName("Write Concern")
- .description("The write concern to use")
- .required(true)
- .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
- WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
- .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
- .build();
-
- static List<PropertyDescriptor> descriptors = new ArrayList<>();
-
- static {
- descriptors.add(URI);
- descriptors.add(DATABASE_NAME);
- descriptors.add(COLLECTION_NAME);
- descriptors.add(SSL_CONTEXT_SERVICE);
- descriptors.add(CLIENT_AUTH);
- }
-
- protected MongoClient mongoClient;
-
- protected final void createClient(ConfigurationContext context) throws IOException {
- if (mongoClient != null) {
- closeClient();
- }
-
- getLogger().info("Creating MongoClient");
-
- // Set up the client for secure (SSL/TLS communications) if configured to do so
- final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
- final SSLContext sslContext;
-
- if (sslService != null) {
- final SSLContextService.ClientAuth clientAuth;
- if (StringUtils.isBlank(rawClientAuth)) {
- clientAuth = SSLContextService.ClientAuth.REQUIRED;
- } else {
- try {
- clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
- } catch (final IllegalArgumentException iae) {
- throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
- rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
- }
- }
- sslContext = sslService.createSSLContext(clientAuth);
- } else {
- sslContext = null;
- }
-
- try {
- if(sslContext == null) {
- mongoClient = new MongoClient(new MongoClientURI(getURI(context)));
- } else {
- mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
- }
- } catch (Exception e) {
- getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
- throw e;
- }
- }
-
- protected Builder getClientOptions(final SSLContext sslContext) {
- MongoClientOptions.Builder builder = MongoClientOptions.builder();
- builder.sslEnabled(true);
- builder.socketFactory(sslContext.getSocketFactory());
- return builder;
- }
-
- @OnStopped
- public final void closeClient() {
- if (mongoClient != null) {
- mongoClient.close();
- mongoClient = null;
- }
- }
-
- protected MongoDatabase getDatabase(final ConfigurationContext context) {
- return getDatabase(context, null);
- }
-
- protected MongoDatabase getDatabase(final ConfigurationContext context, final FlowFile flowFile) {
- final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
- return mongoClient.getDatabase(databaseName);
- }
-
- protected MongoCollection<Document> getCollection(final ConfigurationContext context) {
- return getCollection(context, null);
- }
-
- protected MongoCollection<Document> getCollection(final ConfigurationContext context, final FlowFile flowFile) {
- final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue();
- return getDatabase(context, flowFile).getCollection(collectionName);
- }
-
- protected String getURI(final ConfigurationContext context) {
- return context.getProperty(URI).evaluateAttributeExpressions().getValue();
- }
-
- protected WriteConcern getWriteConcern(final ConfigurationContext context) {
- final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue();
- WriteConcern writeConcern = null;
- switch (writeConcernProperty) {
- case WRITE_CONCERN_ACKNOWLEDGED:
- writeConcern = WriteConcern.ACKNOWLEDGED;
- break;
- case WRITE_CONCERN_UNACKNOWLEDGED:
- writeConcern = WriteConcern.UNACKNOWLEDGED;
- break;
- case WRITE_CONCERN_FSYNCED:
- writeConcern = WriteConcern.FSYNCED;
- break;
- case WRITE_CONCERN_JOURNALED:
- writeConcern = WriteConcern.JOURNALED;
- break;
- case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
- writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
- break;
- case WRITE_CONCERN_MAJORITY:
- writeConcern = WriteConcern.MAJORITY;
- break;
- default:
- writeConcern = WriteConcern.ACKNOWLEDGED;
- }
- return writeConcern;
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return descriptors;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
index a1f9b2b..a851ad6 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
@@ -17,156 +17,159 @@
package org.apache.nifi.mongodb;
-import com.mongodb.client.FindIterable;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientOptions;
+import com.mongodb.MongoClientURI;
+import com.mongodb.WriteConcern;
import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.UpdateOptions;
-
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.authentication.exception.ProviderCreationException;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.reporting.InitializationException;
-import org.bson.Document;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
-import java.io.IOException;
+import javax.net.ssl.SSLContext;
import java.util.ArrayList;
import java.util.List;
@Tags({"mongo", "mongodb", "service"})
@CapabilityDescription(
- "Provides a controller service that wraps most of the functionality of the MongoDB driver."
+ "Provides a controller service that configures a connection to MongoDB and provides access to that connection to " +
+ "other Mongo-related components."
)
-public class MongoDBControllerService extends AbstractMongoDBControllerService implements MongoDBClientService {
- private MongoDatabase db;
- private MongoCollection<Document> col;
+public class MongoDBControllerService extends AbstractControllerService implements MongoDBClientService {
+ private String uri;
@OnEnabled
- public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
+ public void onEnabled(final ConfigurationContext context) {
+ this.uri = context.getProperty(URI).evaluateAttributeExpressions().getValue();
this.createClient(context);
- this.db = this.mongoClient.getDatabase(context.getProperty(MongoDBControllerService.DATABASE_NAME).getValue());
- this.col = this.db.getCollection(context.getProperty(MongoDBControllerService.COLLECTION_NAME).getValue());
- }
-
- @OnDisabled
- public void onDisable() {
- this.mongoClient.close();
- }
-
- @Override
- public long count(Document query) {
- return this.col.count(query);
- }
-
- @Override
- public void delete(Document query) {
- this.col.deleteMany(query);
}
- @Override
- public boolean exists(Document query) {
- return this.col.count(query) > 0;
- }
-
- @Override
- public Document findOne(Document query) {
- MongoCursor<Document> cursor = this.col.find(query).limit(1).iterator();
- Document retVal = cursor.tryNext();
- cursor.close();
-
- return retVal;
- }
+ static List<PropertyDescriptor> descriptors = new ArrayList<>();
- @Override
- public Document findOne(Document query, Document projection) {
- MongoCursor<Document> cursor = projection != null
- ? this.col.find(query).projection(projection).limit(1).iterator()
- : this.col.find(query).limit(1).iterator();
- Document retVal = cursor.tryNext();
- cursor.close();
-
- return retVal;
- }
-
- @Override
- public List<Document> findMany(Document query) {
- return findMany(query, null, -1);
+ static {
+ descriptors.add(URI);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(CLIENT_AUTH);
}
- @Override
- public List<Document> findMany(Document query, int limit) {
- return findMany(query, null, limit);
- }
+ protected MongoClient mongoClient;
- @Override
- public List<Document> findMany(Document query, Document sort, int limit) {
- FindIterable<Document> fi = this.col.find(query);
- if (limit > 0) {
- fi = fi.limit(limit);
- }
- if (sort != null) {
- fi = fi.sort(sort);
- }
- MongoCursor<Document> cursor = fi.iterator();
- List<Document> retVal = new ArrayList<>();
- while (cursor.hasNext()) {
- retVal.add(cursor.next());
+ protected final void createClient(ConfigurationContext context) {
+ if (mongoClient != null) {
+ closeClient();
}
- cursor.close();
- return retVal;
- }
+ getLogger().info("Creating MongoClient");
+
+ // Set up the client for secure (SSL/TLS communications) if configured to do so
+ final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
+ final SSLContext sslContext;
+
+ if (sslService != null) {
+ final SSLContextService.ClientAuth clientAuth;
+ if (StringUtils.isBlank(rawClientAuth)) {
+ clientAuth = SSLContextService.ClientAuth.REQUIRED;
+ } else {
+ try {
+ clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
+ } catch (final IllegalArgumentException iae) {
+ throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
+ rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
+ }
+ }
+ sslContext = sslService.createSSLContext(clientAuth);
+ } else {
+ sslContext = null;
+ }
- @Override
- public void insert(Document doc) {
- this.col.insertOne(doc);
+ try {
+ if(sslContext == null) {
+ mongoClient = new MongoClient(new MongoClientURI(getURI(context)));
+ } else {
+ mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
+ }
+ } catch (Exception e) {
+ getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
+ throw e;
+ }
}
- @Override
- public void insert(List<Document> docs) {
- this.col.insertMany(docs);
+ protected MongoClientOptions.Builder getClientOptions(final SSLContext sslContext) {
+ MongoClientOptions.Builder builder = MongoClientOptions.builder();
+ builder.sslEnabled(true);
+ builder.socketFactory(sslContext.getSocketFactory());
+ return builder;
}
- @Override
- public void update(Document query, Document update, boolean multiple) {
- if (multiple) {
- this.col.updateMany(query, update);
- } else {
- this.col.updateOne(query, update);
+ @OnStopped
+ public final void closeClient() {
+ if (mongoClient != null) {
+ mongoClient.close();
+ mongoClient = null;
}
}
- @Override
- public void update(Document query, Document update) {
- update(query, update, true);
+ protected String getURI(final ConfigurationContext context) {
+ return context.getProperty(URI).evaluateAttributeExpressions().getValue();
+ }
+
+ @Override
+ public WriteConcern getWriteConcern(final ConfigurationContext context) {
+ final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue();
+ WriteConcern writeConcern = null;
+ switch (writeConcernProperty) {
+ case WRITE_CONCERN_ACKNOWLEDGED:
+ writeConcern = WriteConcern.ACKNOWLEDGED;
+ break;
+ case WRITE_CONCERN_UNACKNOWLEDGED:
+ writeConcern = WriteConcern.UNACKNOWLEDGED;
+ break;
+ case WRITE_CONCERN_FSYNCED:
+ writeConcern = WriteConcern.FSYNCED;
+ break;
+ case WRITE_CONCERN_JOURNALED:
+ writeConcern = WriteConcern.JOURNALED;
+ break;
+ case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
+ writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
+ break;
+ case WRITE_CONCERN_MAJORITY:
+ writeConcern = WriteConcern.MAJORITY;
+ break;
+ default:
+ writeConcern = WriteConcern.ACKNOWLEDGED;
+ }
+ return writeConcern;
}
@Override
- public void updateOne(Document query, Document update) {
- this.update(query, update, false);
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
}
- @Override
- public void upsert(Document query, Document update) {
- this.col.updateOne(query, update, new UpdateOptions().upsert(true));
+ @OnDisabled
+ public void onDisable() {
+ this.mongoClient.close();
}
- @Override
- public void dropDatabase() {
- this.db.drop();
- this.col = null;
- }
@Override
- public void dropCollection() {
- this.col.drop();
- this.col = null;
+ public MongoDatabase getDatabase(String name) {
+ return mongoClient.getDatabase(name);
}
@Override
- public MongoDatabase getDatabase(String name) {
- return mongoClient.getDatabase(name);
+ public String getURI() {
+ return uri;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
index cd1829a..be8aa89 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBLookupService.java
@@ -17,6 +17,8 @@
package org.apache.nifi.mongodb;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -24,9 +26,11 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.JsonInferenceSchemaRegistryService;
import org.apache.nifi.serialization.record.MapRecord;
@@ -63,6 +67,9 @@ import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
"then the entire MongoDB result document minus the _id field will be returned as a record."
)
public class MongoDBLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Object> {
+ private volatile String databaseName;
+ private volatile String collection;
+
public static final PropertyDescriptor CONTROLLER_SERVICE = new PropertyDescriptor.Builder()
.name("mongo-lookup-client-service")
.displayName("Client Service")
@@ -70,7 +77,22 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp
.required(true)
.identifiesControllerService(MongoDBClientService.class)
.build();
-
+ public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
+ .name("mongo-db-name")
+ .displayName("Mongo Database Name")
+ .description("The name of the database to use")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
+ .name("mongo-collection-name")
+ .displayName("Mongo Collection Name")
+ .description("The name of the collection to use")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor LOOKUP_VALUE_FIELD = new PropertyDescriptor.Builder()
.name("mongo-lookup-value-field")
.displayName("Lookup Value Field")
@@ -113,7 +135,7 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp
}
try {
- Document result = projection != null ? controllerService.findOne(query, projection) : controllerService.findOne(query);
+ Document result = findOne(query, projection);
if(result == null) {
return Optional.empty();
@@ -149,6 +171,9 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp
this.schemaNameProperty = context.getProperty(SchemaAccessUtils.SCHEMA_NAME).getValue();
+ this.databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions().getValue();
+ this.collection = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue();
+
String configuredProjection = context.getProperty(PROJECTION).isSet()
? context.getProperty(PROJECTION).getValue()
: null;
@@ -187,9 +212,20 @@ public class MongoDBLookupService extends JsonInferenceSchemaRegistryService imp
_temp.add(SCHEMA_BRANCH_NAME);
_temp.add(SCHEMA_TEXT);
_temp.add(CONTROLLER_SERVICE);
+ _temp.add(DATABASE_NAME);
+ _temp.add(COLLECTION_NAME);
_temp.add(LOOKUP_VALUE_FIELD);
_temp.add(PROJECTION);
return Collections.unmodifiableList(_temp);
}
+
+ private Document findOne(Document query, Document projection) {
+ MongoCollection col = controllerService.getDatabase(databaseName).getCollection(collection);
+ MongoCursor<Document> it = (projection != null ? col.find(query).projection(projection) : col.find(query)).iterator();
+ Document retVal = it.hasNext() ? it.next() : null;
+ it.close();
+
+ return retVal;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
index 33a173c..cd306f9 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
@@ -17,22 +17,13 @@
package org.apache.nifi.mongodb;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.bson.Document;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
public class MongoDBControllerServiceIT {
private static final String DB_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis());
@@ -46,15 +37,12 @@ public class MongoDBControllerServiceIT {
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
service = new MongoDBControllerService();
runner.addControllerService("Client Service", service);
- runner.setProperty(service, MongoDBControllerService.DATABASE_NAME, DB_NAME);
- runner.setProperty(service, MongoDBControllerService.COLLECTION_NAME, COL_NAME);
runner.setProperty(service, MongoDBControllerService.URI, "mongodb://localhost:27017");
runner.enableControllerService(service);
}
@After
public void after() throws Exception {
- service.dropDatabase();
service.onDisable();
}
@@ -62,116 +50,4 @@ public class MongoDBControllerServiceIT {
public void testInit() throws Exception {
runner.assertValid(service);
}
-
- @Test
- public void testBasicCRUD() throws Exception {
- Document doc = service.convertJson("{\n" +
- "\t\"uuid\": \"x-y-z\",\n" +
- "\t\"message\": \"Testing!\"\n" +
- "}");
- Document lookup = service.convertJson("{ \"uuid\": \"x-y-z\" }");
- Document update = service.convertJson("{\n" +
- "\t\"$set\": {\n" +
- "\t\t\"updatedBy\": \"testUser\"\n" +
- "\t}\n" +
- "}");
-
- service.insert(doc);
- Document result = service.findOne(lookup);
-
- Assert.assertNotNull("The result was null", result);
- Assert.assertEquals("The UUID did not match", result.getString("uuid"), "x-y-z");
- Assert.assertNotNull("The message block was missing", result.getString("message"));
- Assert.assertEquals("The message block did not match", result.getString("message"), "Testing!");
-
- service.update(lookup, update, false);
-
- result = service.findOne(lookup);
-
- Assert.assertNotNull("The result was null", result);
- Assert.assertEquals("The UUID did not match", result.getString("uuid"), "x-y-z");
- Assert.assertNotNull("The message block was missing", result.getString("message"));
- Assert.assertEquals("The message block did not match", result.getString("message"), "Testing!");
- Assert.assertNotNull("The updatedBy block was missing", result.getString("updatedBy"));
- Assert.assertEquals("The updatedBy block did not match", result.getString("updatedBy"), "testUser");
-
- service.delete(lookup);
-
- boolean exists = service.exists(lookup);
-
- Assert.assertFalse("After the delete, the document still existed", exists);
- }
-
- @Test
- public void testMultipleCRUD() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- List<Document> sampleDocuments = new ArrayList<>();
- List<String> uuids = new ArrayList<>();
- Map<String, Object> mappings = new HashMap<>();
- Random random = new Random();
- int count = random.nextInt(1000);
- for (int x = 0; x < count; x++) {
- Map<String, Object> doc = new HashMap<>();
- String uuid = UUID.randomUUID().toString();
- String ts = Calendar.getInstance().getTime().toString();
- uuids.add(uuid);
- mappings.put(uuid, ts);
-
- doc.put("uuid", uuid);
- doc.put("timestamp", ts);
- doc.put("randomNumber", random.nextInt(10));
-
- String json = mapper.writeValueAsString(doc);
- sampleDocuments.add(service.convertJson(json));
- }
-
- service.insert(sampleDocuments);
-
- long docCount = service.count(service.convertJson("{}"));
-
- Assert.assertEquals("The counts did not match", docCount, count);
- for (String uuid : uuids) {
- Document lookup = service.convertJson(String.format("{ \"uuid\": \"%s\" }", uuid));
- Document result = service.findOne(lookup);
- Assert.assertNotNull("The document was not found", result);
- Assert.assertEquals("The uuid did not match", result.getString("uuid"), uuid);
- Assert.assertEquals("The timestamp did not match", result.getString("timestamp"), mappings.get(uuid));
- }
-
- Document query = service.convertJson("{ \"randomNumber\": 5 }");
- docCount = service.count(query);
- List<Document> results = service.findMany(query);
-
- Assert.assertTrue("Count should have been >= 1", docCount >= 1);
- Assert.assertNotNull("Result set was null", results);
- Assert.assertEquals("The counts did not match up", docCount, results.size());
- }
-
- @Test
- public void testUpsert() throws Exception {
- Document query = service.convertJson(String.format("{ \"uuid\": \"%s\" }", UUID.randomUUID().toString()));
- Document update = service.convertJson("{ \"$set\": { \"message\": \"Hello, world\" } }");
- service.upsert(query, update);
-
- Document result = service.findOne(query);
- Assert.assertNotNull("No result returned", result);
- Assert.assertEquals("UUID did not match", result.getString("uuid"), query.getString("uuid"));
- Assert.assertEquals("Message did not match", result.getString("message"), "Hello, world");
-
- Map<String, String> mappings = new HashMap<>();
- for (int x = 0; x < 5; x++) {
- String fieldName = String.format("field_%d", x);
- String uuid = UUID.randomUUID().toString();
- mappings.put(fieldName, uuid);
- update = service.convertJson(String.format("{ \"$set\": { \"%s\": \"%s\" } }", fieldName, uuid));
-
- service.upsert(query, update);
- }
-
- result = service.findOne(query);
-
- for (Map.Entry<String, String> entry : mappings.entrySet()) {
- Assert.assertEquals("Entry did not match.", entry.getValue(), result.getString(entry.getKey()));
- }
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
index 2c7f522..c3ae905 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBLookupServiceIT.java
@@ -17,6 +17,8 @@
package org.apache.nifi.mongodb;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
@@ -49,6 +51,8 @@ public class MongoDBLookupServiceIT {
private TestRunner runner;
private MongoDBLookupService service;
private MongoDBControllerService controllerService;
+ private MongoDatabase db;
+ private MongoCollection col;
@Before
public void before() throws Exception {
@@ -58,8 +62,8 @@ public class MongoDBLookupServiceIT {
runner.addControllerService("Client Service", service);
runner.addControllerService("Client Service 2", controllerService);
runner.setProperty(TestLookupServiceProcessor.CLIENT_SERVICE, "Client Service");
- runner.setProperty(controllerService, MongoDBControllerService.DATABASE_NAME, DB_NAME);
- runner.setProperty(controllerService, MongoDBControllerService.COLLECTION_NAME, COL_NAME);
+ runner.setProperty(service, MongoDBLookupService.DATABASE_NAME, DB_NAME);
+ runner.setProperty(service, MongoDBLookupService.COLLECTION_NAME, COL_NAME);
runner.setProperty(controllerService, MongoDBControllerService.URI, "mongodb://localhost:27017");
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
runner.setProperty(service, MongoDBLookupService.CONTROLLER_SERVICE, "Client Service 2");
@@ -70,11 +74,14 @@ public class MongoDBLookupServiceIT {
runner.enableControllerService(registry);
runner.enableControllerService(controllerService);
runner.enableControllerService(service);
+
+ db = controllerService.getDatabase(DB_NAME);
+ col = db.getCollection(COL_NAME);
}
@After
public void after() {
- controllerService.dropDatabase();
+ db.drop();
controllerService.onDisable();
}
@@ -90,7 +97,7 @@ public class MongoDBLookupServiceIT {
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
runner.enableControllerService(service);
Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
- controllerService.insert(document);
+ col.insertOne(document);
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
@@ -101,7 +108,7 @@ public class MongoDBLookupServiceIT {
Map<String, Object> clean = new HashMap<>();
clean.putAll(criteria);
- controllerService.delete(new Document(clean));
+ col.deleteOne(new Document(clean));
try {
result = service.lookup(criteria);
@@ -116,7 +123,7 @@ public class MongoDBLookupServiceIT {
public void testWithSchemaRegistry() throws Exception {
runner.assertValid();
- controllerService.insert(new Document()
+ col.insertOne(new Document()
.append("username", "john.smith")
.append("password", "testing1234")
);
@@ -157,7 +164,7 @@ public class MongoDBLookupServiceIT {
runner.enableControllerService(service);
runner.assertValid();
- controllerService.insert(new Document().append("msg", "Testing1234"));
+ col.insertOne(new Document().append("msg", "Testing1234"));
Map<String, Object> criteria = new HashMap<>();
criteria.put("msg", "Testing1234");
@@ -180,7 +187,7 @@ public class MongoDBLookupServiceIT {
Timestamp ts = new Timestamp(new Date().getTime());
List list = Arrays.asList("a", "b", "c", "d", "e");
- controllerService.insert(new Document()
+ col.insertOne(new Document()
.append("uuid", "x-y-z")
.append("dateField", d)
.append("longField", 10000L)
@@ -217,7 +224,7 @@ public class MongoDBLookupServiceIT {
Map<String, Object> clean = new HashMap<>();
clean.putAll(criteria);
- controllerService.delete(new Document(clean));
+ col.deleteOne(new Document(clean));
try {
result = service.lookup(criteria);
@@ -231,7 +238,7 @@ public class MongoDBLookupServiceIT {
@Test
public void testServiceParameters() {
Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
- controllerService.insert(document);
+ col.insertOne(document);
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
http://git-wip-us.apache.org/repos/asf/nifi/blob/1dea8faa/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
index fef2f92..eecb467 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
@@ -38,6 +38,27 @@
<mongo.driver.version>3.2.2</mongo.driver.version>
</properties>
+ <profiles>
+ <profile>
+ <id>3.4</id>
+ <properties>
+ <mongo.driver.version>3.4.3</mongo.driver.version>
+ </properties>
+ </profile>
+ <profile>
+ <id>3.6</id>
+ <properties>
+ <mongo.driver.version>3.6.4</mongo.driver.version>
+ </properties>
+ </profile>
+ <profile>
+ <id>3.8</id>
+ <properties>
+ <mongo.driver.version>3.8.0</mongo.driver.version>
+ </properties>
+ </profile>
+ </profiles>
+
<dependencyManagement>
<dependencies>
<dependency>