You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/02/13 22:29:29 UTC
[nifi] branch master updated: NIFI-4975 Add GridFS processors
NIFI-4975 Added changes requested in a code review. NIFI-4975 Reverted some
base Mongo changes. NIFI-4975 Moved connection configuration to using Mongo
client service. NIFI-4975 Fixed a lot of style issues. NIFI-4975 Removed an
EL statement that was causing problems with the UI. NIFI-4975 Added changes
from code review. NIFI-4975 Added additional details for FetchGridFS.
NIFI-4975 Added documentation for DeleteGridFS. NIFI-4975 Added
documentation for PutGridFS.
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 033b2a1 NIFI-4975 Add GridFS processors NIFI-4975 Added changes requested in a code review. NIFI-4975 Reverted some base Mongo changes. NIFI-4975 Moved connection configuration to using Mongo client service. NIFI-4975 Fixed a lot of style issues. NIFI-4975 Removed an EL statement that was causing problems with the UI. NIFI-4975 Added changes from code review. NIFI-4975 Added additional details for FetchGridFS. NIFI-4975 Added documentation for DeleteGridFS. NIFI-4975 Added docum [...]
033b2a1 is described below
commit 033b2a1940bf97bb213347a53d2d2a6c0b4cd12b
Author: Mike Thomsen <mi...@gmail.com>
AuthorDate: Sat Mar 10 12:57:28 2018 -0500
NIFI-4975 Add GridFS processors
NIFI-4975 Added changes requested in a code review.
NIFI-4975 Reverted some base Mongo changes.
NIFI-4975 Moved connection configuration to using Mongo client service.
NIFI-4975 Fixed a lot of style issues.
NIFI-4975 Removed an EL statement that was causing problems with the UI.
NIFI-4975 Added changes from code review.
NIFI-4975 Added additional details for FetchGridFS.
NIFI-4975 Added documentation for DeleteGridFS.
NIFI-4975 Added documentation for PutGridFS.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2546
---
.../processors/mongodb/AbstractMongoProcessor.java | 34 +--
.../nifi/processors/mongodb/QueryHelper.java | 67 ++++++
.../mongodb/gridfs/AbstractGridFSProcessor.java | 150 +++++++++++++
.../processors/mongodb/gridfs/DeleteGridFS.java | 172 ++++++++++++++
.../processors/mongodb/gridfs/FetchGridFS.java | 205 +++++++++++++++++
.../nifi/processors/mongodb/gridfs/PutGridFS.java | 248 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 5 +-
.../additionalDetails.html | 32 +++
.../additionalDetails.html | 43 ++++
.../additionalDetails.html | 58 +++++
.../processors/mongodb/gridfs/DeleteGridFSIT.java | 110 +++++++++
.../processors/mongodb/gridfs/FetchGridFSIT.java | 186 ++++++++++++++++
.../mongodb/gridfs/GridFSITTestBase.java | 108 +++++++++
.../processors/mongodb/gridfs/PutGridFSIT.java | 185 +++++++++++++++
.../nifi/mongodb/MongoDBControllerService.java | 4 +-
15 files changed, 1587 insertions(+), 20 deletions(-)
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
index 69de94b..43c210c 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
@@ -121,24 +121,24 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("ssl-context-service")
- .displayName("SSL Context Service")
- .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
- + "connections.")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
+ .name("ssl-context-service")
+ .displayName("SSL Context Service")
+ .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+ + "connections.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
- .name("ssl-client-auth")
- .displayName("Client Auth")
- .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
- + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
- + "has been defined and enabled.")
- .required(false)
- .allowableValues(SSLContextService.ClientAuth.values())
- .defaultValue("REQUIRED")
- .build();
+ .name("ssl-client-auth")
+ .displayName("Client Auth")
+ .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+ + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+ + "has been defined and enabled.")
+ .required(false)
+ .allowableValues(SSLContextService.ClientAuth.values())
+ .defaultValue("REQUIRED")
+ .build();
public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
.name("Write Concern")
@@ -341,7 +341,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
}
protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session,
- Map<String, String> extraAttributes, Relationship rel) throws UnsupportedEncodingException {
+ Map<String, String> extraAttributes, Relationship rel) throws UnsupportedEncodingException {
String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue();
FlowFile flowFile = parent != null ? session.create(parent) : session.create();
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/QueryHelper.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/QueryHelper.java
new file mode 100644
index 0000000..c31b016
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/QueryHelper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public interface QueryHelper {
+ AllowableValue MODE_ONE_COMMIT = new AllowableValue("all-at-once", "Full Query Fetch",
+ "Fetch the entire query result and then make it available to downstream processors.");
+ AllowableValue MODE_MANY_COMMITS = new AllowableValue("streaming", "Stream Query Results",
+ "As soon as the query start sending results to the downstream processors at regular intervals.");
+
+ PropertyDescriptor OPERATION_MODE = new PropertyDescriptor.Builder()
+ .name("mongo-operation-mode")
+ .displayName("Operation Mode")
+ .allowableValues(MODE_ONE_COMMIT, MODE_MANY_COMMITS)
+ .defaultValue(MODE_ONE_COMMIT.getValue())
+ .required(true)
+ .description("This option controls when results are made available to downstream processors. If Stream Query Results is enabled, " +
+ "provenance will not be tracked relative to the input flowfile if an input flowfile is received and starts the query. In Stream Query Results mode " +
+ "errors will be handled by sending a new flowfile with the original content and attributes of the input flowfile to the failure " +
+ "relationship. Streaming should only be used if there is reliable connectivity between MongoDB and NiFi.")
+ .addValidator(Validator.VALID)
+ .build();
+
+ default String readQuery(ProcessContext context, ProcessSession session, PropertyDescriptor queryProp, FlowFile input) throws IOException {
+ String queryStr;
+
+ if (context.getProperty(queryProp).isSet()) {
+ queryStr = context.getProperty(queryProp).evaluateAttributeExpressions(input).getValue();
+ } else if (!context.getProperty(queryProp).isSet() && input == null) {
+ queryStr = "{}";
+ } else {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ session.exportTo(input, out);
+ out.close();
+ queryStr = new String(out.toByteArray());
+ }
+
+ return queryStr;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/AbstractGridFSProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/AbstractGridFSProcessor.java
new file mode 100644
index 0000000..48368b6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/AbstractGridFSProcessor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.GridFSBuckets;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+import org.bson.types.ObjectId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractGridFSProcessor extends AbstractProcessor {
+ static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+ .name("gridfs-client-service")
+ .displayName("Client Service")
+ .description("The MongoDB client service to use for database connections.")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(true)
+ .identifiesControllerService(MongoDBClientService.class)
+ .build();
+
+ static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
+ .name("gridfs-database-name")
+ .displayName("Mongo Database Name")
+ .description("The name of the database to use")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder()
+ .name("gridfs-bucket-name")
+ .displayName("Bucket Name")
+ .description("The GridFS bucket where the files will be stored. If left blank, it will use the default value 'fs' " +
+ "that the MongoDB client driver uses.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(Validator.VALID)
+ .build();
+
+ static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
+ .name("gridfs-file-name")
+ .displayName("File Name")
+ .description("The name of the file in the bucket that is the target of this processor.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder()
+ .name("mongo-query-attribute")
+ .displayName("Query Output Attribute")
+ .description("If set, the query will be written to a specified attribute on the output flowfiles.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .required(false)
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("When there is a failure processing the flowfile, it goes to this relationship.")
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("When the operation succeeds, the flowfile is sent to this relationship.")
+ .build();
+
+ static final List<PropertyDescriptor> PARENT_PROPERTIES;
+
+ static final Set<Relationship> PARENT_RELATIONSHIPS;
+
+ protected volatile MongoDBClientService clientService;
+
+ static {
+ List<PropertyDescriptor> _temp = new ArrayList<>();
+ _temp.add(CLIENT_SERVICE);
+ _temp.add(DATABASE_NAME);
+ _temp.add(BUCKET_NAME);
+ PARENT_PROPERTIES = Collections.unmodifiableList(_temp);
+
+ Set<Relationship> _rels = new HashSet<>();
+ _rels.add(REL_SUCCESS);
+ _rels.add(REL_FAILURE);
+ PARENT_RELATIONSHIPS = Collections.unmodifiableSet(_rels);
+ }
+
+ protected MongoDatabase getDatabase(FlowFile input, ProcessContext context) {
+ return clientService.getDatabase(context.getProperty(DATABASE_NAME)
+ .evaluateAttributeExpressions(input)
+ .getValue());
+ }
+
+ protected GridFSBucket getBucket(FlowFile input, ProcessContext context) {
+ final String name = getBucketName(input, context);
+ if (StringUtils.isEmpty(name)) {
+ return GridFSBuckets.create(getDatabase(input, context));
+ } else {
+ return GridFSBuckets.create(getDatabase(input, context), name);
+ }
+ }
+
+ protected String getBucketName(FlowFile input, ProcessContext context) {
+ return context.getProperty(BUCKET_NAME).isSet()
+ ? context.getProperty(BUCKET_NAME).evaluateAttributeExpressions(input).getValue()
+ : null;
+ }
+
+ protected String getTransitUri(ObjectId id, FlowFile input, ProcessContext context) {
+ String bucket = getBucketName(input, context);
+ String uri = clientService.getURI();
+ return new StringBuilder()
+ .append(uri)
+ .append(uri.endsWith("/") ? "" : "/")
+ .append(bucket)
+ .append("/")
+ .append(id.toString())
+ .toString();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFS.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFS.java
new file mode 100644
index 0000000..680731b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFS.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.model.GridFSFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+import org.bson.Document;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@CapabilityDescription("Deletes a file from GridFS using a file name or a query.")
+@Tags({"gridfs", "delete", "mongodb"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class DeleteGridFS extends AbstractGridFSProcessor {
+ static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+ .name("delete-gridfs-query")
+ .displayName("Query")
+ .description("A valid MongoDB query to use to find and delete one or more files from GridFS.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(JsonValidator.INSTANCE)
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
+ .name("gridfs-file-name")
+ .displayName("File Name")
+ .description("The name of the file in the bucket that is the target of this processor. GridFS file names do not " +
+ "include path information because GridFS does not sort files into folders within a bucket.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final List<PropertyDescriptor> DESCRIPTORS;
+
+ static {
+ List<PropertyDescriptor> _temp = new ArrayList<>();
+ _temp.addAll(PARENT_PROPERTIES);
+ _temp.add(FILE_NAME);
+ _temp.add(QUERY);
+ _temp.add(QUERY_ATTRIBUTE);
+ DESCRIPTORS = Collections.unmodifiableList(_temp);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return new HashSet<>(PARENT_RELATIONSHIPS);
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ ArrayList<ValidationResult> problems = new ArrayList<>();
+
+ boolean fileName = validationContext.getProperty(FILE_NAME).isSet();
+ boolean query = validationContext.getProperty(QUERY).isSet();
+
+ if (fileName && query) {
+ problems.add(new ValidationResult.Builder()
+ .valid(false)
+ .explanation("File name and Query cannot be set at the same time.")
+ .build()
+ );
+ } else if (!fileName && !query) {
+ problems.add(new ValidationResult.Builder()
+ .valid(false)
+ .explanation("File name or Query must be set, but not both at the same time.")
+ .build()
+ );
+ }
+
+ return problems;
+ }
+
+ private String getQuery(ProcessContext context, FlowFile input) {
+ String queryString;
+ if (context.getProperty(FILE_NAME).isSet()) {
+ String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
+ queryString = String.format("{ \"filename\": \"%s\"}", fileName);
+ } else {
+ queryString = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+ }
+
+ return queryString;
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile input = session.get();
+ if (input == null) {
+ return;
+ }
+
+ final String deleteQuery = getQuery(context, input);
+ final String queryAttribute = context.getProperty(QUERY_ATTRIBUTE).isSet()
+ ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+ : null;
+ GridFSBucket bucket = getBucket(input, context);
+
+ try {
+ Document query = Document.parse(deleteQuery);
+ MongoCursor cursor = bucket.find(query).iterator();
+ if (cursor.hasNext()) {
+ GridFSFile file = (GridFSFile)cursor.next();
+ bucket.delete(file.getObjectId());
+
+ if (!StringUtils.isEmpty(queryAttribute)) {
+ input = session.putAttribute(input, queryAttribute, deleteQuery);
+ }
+
+ session.transfer(input, REL_SUCCESS);
+ } else {
+ getLogger().error(String.format("Query %s did not delete anything in %s", deleteQuery, bucket.getBucketName()));
+ session.transfer(input, REL_FAILURE);
+ }
+
+ cursor.close();
+ } catch (Exception ex) {
+ getLogger().error(String.format("Error deleting using query: %s", deleteQuery), ex);
+ session.transfer(input, REL_FAILURE);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFS.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFS.java
new file mode 100644
index 0000000..11d4b87
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFS.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.model.GridFSFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processors.mongodb.QueryHelper;
+import org.apache.nifi.util.StringUtils;
+import org.bson.Document;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttributes(
+ @WritesAttribute(attribute = "gridfs.file.metadata", description = "The custom metadata stored with a file is attached to this property if it exists.")
+)
+@Tags({"fetch", "gridfs", "mongo"})
+@CapabilityDescription("Retrieves one or more files from a GridFS bucket by file name or by a user-defined query.")
+public class FetchGridFS extends AbstractGridFSProcessor implements QueryHelper {
+
+ static final String METADATA_ATTRIBUTE = "gridfs.file.metadata";
+
+ static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+ .name("gridfs-query")
+ .displayName("Query")
+ .description("A valid MongoDB query to use to fetch one or more files from GridFS.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(JsonValidator.INSTANCE)
+ .required(false)
+ .build();
+
+ static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original input flowfile goes to this relationship if the query does not cause an error")
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+ static final Set<Relationship> RELATIONSHIP_SET;
+
+ static {
+ List<PropertyDescriptor> _temp = new ArrayList<>();
+ _temp.addAll(PARENT_PROPERTIES);
+ _temp.add(FILE_NAME);
+ _temp.add(QUERY);
+ _temp.add(QUERY_ATTRIBUTE);
+ _temp.add(OPERATION_MODE);
+ PROPERTY_DESCRIPTORS = Collections.unmodifiableList(_temp);
+
+ Set<Relationship> _rels = new HashSet<>();
+ _rels.addAll(PARENT_RELATIONSHIPS);
+ _rels.add(REL_ORIGINAL);
+ RELATIONSHIP_SET = Collections.unmodifiableSet(_rels);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIP_SET;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ private String getQuery(ProcessSession session, ProcessContext context, FlowFile input) throws IOException {
+ String queryString;
+ if (context.getProperty(FILE_NAME).isSet()) {
+ String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
+ queryString = String.format("{ \"filename\": \"%s\"}", fileName);
+ } else if (context.getProperty(QUERY).isSet()) {
+ queryString = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+ } else {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ session.exportTo(input, out);
+ out.close();
+ queryString = new String(out.toByteArray());
+ }
+
+ return queryString;
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile input = session.get();
+ if (input == null) {
+ return;
+ }
+
+ final String operatingMode = context.getProperty(OPERATION_MODE).getValue();
+ final Map<String, String> originalAttributes = input.getAttributes();
+
+ String queryStr;
+ try {
+ queryStr = getQuery(session, context, input);
+ if (StringUtils.isEmpty(queryStr)) {
+ getLogger().error("No query could be found or built from the supplied input.");
+ session.transfer(input, REL_FAILURE);
+ return;
+ }
+ } catch (IOException ex) {
+ getLogger().error("No query could be found from supplied input", ex);
+ session.transfer(input, REL_FAILURE);
+ return;
+ }
+
+ Document query = Document.parse(queryStr);
+
+ try {
+ final GridFSBucket bucket = getBucket(input, context);
+ final String queryPtr = queryStr;
+ final FlowFile parent = operatingMode.equals(MODE_ONE_COMMIT.getValue()) ? input : null;
+
+ MongoCursor it = bucket.find(query).iterator();
+ if (operatingMode.equals(MODE_MANY_COMMITS.getValue())) {
+ session.transfer(input, REL_ORIGINAL);
+ input = null;
+ }
+
+ while (it.hasNext()) {
+ GridFSFile gridFSFile = (GridFSFile)it.next();
+ handleFile(bucket, session, context, parent, gridFSFile, queryPtr);
+
+ if (operatingMode.equals(MODE_MANY_COMMITS.getValue())) {
+ session.commit();
+ }
+ }
+
+ if (input != null) {
+ session.transfer(input, REL_ORIGINAL);
+ }
+ } catch (Exception ex) {
+ getLogger().error("An error occurred wile trying to run the query.", ex);
+ if (input != null && operatingMode.equals(MODE_ONE_COMMIT.getValue())) {
+ session.transfer(input, REL_FAILURE);
+ } else if (input != null && operatingMode.equals(MODE_MANY_COMMITS.getValue())) {
+ final String queryPtr = queryStr;
+ FlowFile cloned = session.create();
+ cloned = session.putAllAttributes(cloned, originalAttributes);
+ cloned = session.write(cloned, out -> out.write(queryPtr.getBytes()));
+ session.transfer(cloned, REL_FAILURE);
+ }
+ }
+ }
+
+ private void handleFile(GridFSBucket bucket, ProcessSession session, ProcessContext context, FlowFile parent, GridFSFile input, String query) {
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(METADATA_ATTRIBUTE, input.getMetadata() != null ? input.getMetadata().toJson() : "{}");
+ if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
+ String key = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(parent).getValue();
+ attrs.put(key, query);
+ }
+ attrs.put(CoreAttributes.FILENAME.key(), input.getFilename());
+ FlowFile output = parent != null ? session.create(parent) : session.create();
+ output = session.write(output, out -> bucket.downloadToStream(input.getObjectId(), out));
+ output = session.putAllAttributes(output, attrs);
+ session.transfer(output, REL_SUCCESS);
+ session.getProvenanceReporter().receive(output, getTransitUri(input.getObjectId(), output, context));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFS.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFS.java
new file mode 100644
index 0000000..be9dd46
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFS.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.model.GridFSUploadOptions;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+import org.bson.Document;
+import org.bson.types.ObjectId;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"mongo", "gridfs", "put", "file", "store"})
+@CapabilityDescription("Writes a file to a GridFS bucket.")
+public class PutGridFS extends AbstractGridFSProcessor {
+
+ static final PropertyDescriptor PROPERTIES_PREFIX = new PropertyDescriptor.Builder()
+ .name("putgridfs-properties-prefix")
+ .displayName("File Properties Prefix")
+ .description("Attributes that have this prefix will be added to the file stored in GridFS as metadata.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(Validator.VALID)
+ .build();
+
+ static final AllowableValue NO_UNIQUE = new AllowableValue("none", "None", "No uniqueness will be enforced.");
+ static final AllowableValue UNIQUE_NAME = new AllowableValue("name", "Name", "Only the filename must " +
+ "be unique.");
+ static final AllowableValue UNIQUE_HASH = new AllowableValue("hash", "Hash", "Only the file hash must be " +
+ "unique.");
+ static final AllowableValue UNIQUE_BOTH = new AllowableValue("both", "Both", "Both the filename and hash " +
+ "must be unique.");
+
+ static final PropertyDescriptor ENFORCE_UNIQUENESS = new PropertyDescriptor.Builder()
+ .name("putgridfs-enforce-uniqueness")
+ .displayName("Enforce Uniqueness")
+ .description("When enabled, this option will ensure that uniqueness is enforced on the bucket. It will do so by creating a MongoDB index " +
+ "that matches your selection. It should ideally be configured once when the bucket is created for the first time because " +
+ "it could take a long time to build on an existing bucket wit a lot of data.")
+ .allowableValues(NO_UNIQUE, UNIQUE_BOTH, UNIQUE_NAME, UNIQUE_HASH)
+ .defaultValue(NO_UNIQUE.getValue())
+ .required(true)
+ .build();
+ static final PropertyDescriptor HASH_ATTRIBUTE = new PropertyDescriptor.Builder()
+ .name("putgridfs-hash-attribute")
+ .displayName("Hash Attribute")
+ .description("If uniquness enforcement is enabled and the file hash is part of the constraint, this must be set to an attribute that " +
+ "exists on all incoming flowfiles.")
+ .defaultValue("hash.value")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
+ .name("putgridfs-chunk-size")
+ .displayName("Chunk Size")
+ .description("Controls the maximum size of each chunk of a file uploaded into GridFS.")
+ .defaultValue("256 KB")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
+ .name("gridfs-file-name")
+ .displayName("File Name")
+ .description("The name of the file in the bucket that is the target of this processor. GridFS file names do not " +
+ "include path information because GridFS does not sort files into folders within a bucket.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final Relationship REL_DUPLICATE = new Relationship.Builder()
+ .name("duplicate")
+ .description("Flowfiles that fail the duplicate check are sent to this relationship.")
+ .build();
+
+ static final String ID_ATTRIBUTE = "gridfs.id";
+
+ static final List<PropertyDescriptor> DESCRIPTORS;
+ static final Set<Relationship> RELATIONSHIP_SET;
+
+ static {
+ List _temp = new ArrayList<>();
+ _temp.addAll(PARENT_PROPERTIES);
+ _temp.add(FILE_NAME);
+ _temp.add(PROPERTIES_PREFIX);
+ _temp.add(ENFORCE_UNIQUENESS);
+ _temp.add(HASH_ATTRIBUTE);
+ _temp.add(CHUNK_SIZE);
+ DESCRIPTORS = Collections.unmodifiableList(_temp);
+
+ Set _rels = new HashSet();
+ _rels.addAll(PARENT_RELATIONSHIPS);
+ _rels.add(REL_DUPLICATE);
+ RELATIONSHIP_SET = Collections.unmodifiableSet(_rels);
+ }
+
+ private String uniqueness;
+ private String hashAttribute;
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ this.uniqueness = context.getProperty(ENFORCE_UNIQUENESS).getValue();
+ this.hashAttribute = context.getProperty(HASH_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+ this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIP_SET;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile input = session.get();
+ if (input == null) {
+ return;
+ }
+
+ GridFSBucket bucket = getBucket(input, context);
+
+ if (!canUploadFile(context, input, bucket.getBucketName())) {
+ getLogger().error("Cannot upload the file because of the uniqueness policy configured.");
+ session.transfer(input, REL_DUPLICATE);
+ return;
+ }
+
+ final int chunkSize = context.getProperty(CHUNK_SIZE).evaluateAttributeExpressions(input).asDataSize(DataUnit.B).intValue();
+
+ try (InputStream fileInput = session.read(input)) {
+ String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
+ GridFSUploadOptions options = new GridFSUploadOptions()
+ .chunkSizeBytes(chunkSize)
+ .metadata(getMetadata(input, context));
+ ObjectId id = bucket.uploadFromStream(fileName, fileInput, options);
+ fileInput.close();
+
+ if (id != null) {
+ input = session.putAttribute(input, ID_ATTRIBUTE, id.toString());
+ session.transfer(input, REL_SUCCESS);
+ session.getProvenanceReporter().send(input, getTransitUri(id, input, context));
+ } else {
+ getLogger().error("ID was null, assuming failure.");
+ session.transfer(input, REL_FAILURE);
+ }
+ } catch (Exception ex) {
+ getLogger().error("Failed to upload file", ex);
+ session.transfer(input, REL_FAILURE);
+ }
+ }
+
+ private boolean canUploadFile(ProcessContext context, FlowFile input, String bucketName) {
+ boolean retVal;
+
+ if (uniqueness.equals(NO_UNIQUE.getValue())) {
+ retVal = true;
+ } else {
+ final String fileName = input.getAttribute(CoreAttributes.FILENAME.key());
+ final String fileColl = String.format("%s.files", bucketName);
+ final String hash = input.getAttribute(hashAttribute);
+
+ if ((uniqueness.equals(UNIQUE_BOTH.getValue()) || uniqueness.equals(UNIQUE_HASH.getValue())) && StringUtils.isEmpty(hash)) {
+ throw new RuntimeException(String.format("Uniqueness mode %s was set and the hash attribute %s was not found.", uniqueness, hashAttribute));
+ }
+
+ Document query;
+ if (uniqueness.equals(UNIQUE_BOTH.getValue())) {
+ query = new Document().append("filename", fileName).append("md5", hash);
+ } else if (uniqueness.equals(UNIQUE_HASH.getValue())) {
+ query = new Document().append("md5", hash);
+ } else {
+ query = new Document().append("filename", fileName);
+ }
+
+ retVal = getDatabase(input, context).getCollection(fileColl).count(query) == 0;
+ }
+
+ return retVal;
+ }
+
+ private Document getMetadata(FlowFile input, ProcessContext context) {
+ final String prefix = context.getProperty(PROPERTIES_PREFIX).evaluateAttributeExpressions(input).getValue();
+ Document doc;
+
+ if (StringUtils.isEmpty(prefix)) {
+ doc = Document.parse("{}");
+ } else {
+ doc = new Document();
+ Map<String, String> attributes = input.getAttributes();
+ for (Map.Entry<String, String> entry : attributes.entrySet()) {
+ if (entry.getKey().startsWith(prefix)) {
+ String cleanPrefix = prefix.endsWith(".") ? prefix : String.format("%s.", prefix);
+ String cleanKey = entry.getKey().replace(cleanPrefix, "");
+ doc.append(cleanKey, entry.getValue());
+ }
+ }
+ }
+
+ return doc;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index bfe2b74..3797ca0 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -18,4 +18,7 @@ org.apache.nifi.processors.mongodb.GetMongo
org.apache.nifi.processors.mongodb.GetMongoRecord
org.apache.nifi.processors.mongodb.RunMongoAggregation
org.apache.nifi.processors.mongodb.PutMongo
-org.apache.nifi.processors.mongodb.PutMongoRecord
\ No newline at end of file
+org.apache.nifi.processors.mongodb.PutMongoRecord
+org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS
+org.apache.nifi.processors.mongodb.gridfs.FetchGridFS
+org.apache.nifi.processors.mongodb.gridfs.PutGridFS
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS/additionalDetails.html
new file mode 100644
index 0000000..b748755
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS/additionalDetails.html
@@ -0,0 +1,32 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>DeleteGridFS</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+<h2>Description:</h2>
+<p>
+ This processor retrieves one or more files from GridFS. The query to execute can be either provided in the query
+ configuration parameter or generated from the value pulled from the filename configuration parameter. Upon successful
+ execution, it will append the query that was executed as an attribute on the flowfile that was processed.
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.FetchGridFS/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.FetchGridFS/additionalDetails.html
new file mode 100644
index 0000000..279216c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.FetchGridFS/additionalDetails.html
@@ -0,0 +1,43 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>FetchGridFS</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+<h2>Description:</h2>
+<p>
+ This processor retrieves one or more files from GridFS. The query can be provided in one of three ways:
+</p>
+
+<ul>
+ <li>Query configuration parameter.</li>
+ <li>Built for you by configuring the filename parameter. (Note: this is just a filename, Mongo queries cannot be
+ embedded in the field).</li>
+ <li>Retrieving the query from the flowfile contents.</li>
+</ul>
+
+<p>
+ The processor can also be configured to either commit only once at the end of a fetch operation or after each file
+ that is retrieved. Multiple commits is generally only necessary when retrieving a lot of data from GridFS as measured
+ in total data size, not file count, to ensure that the disks NiFi is using are not overloaded.
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.PutGridFS/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.PutGridFS/additionalDetails.html
new file mode 100644
index 0000000..62330dd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.PutGridFS/additionalDetails.html
@@ -0,0 +1,58 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>PutGridFS</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+<h2>Description:</h2>
+<p>
+ This processor puts a file with one or more user-defined metadata values into GridFS in the configured bucket. It
+ allows the user to define how big each file chunk will be during ingestion and provides some ability to intelligently
+ attempt to enforce file uniqueness using filename or hash values instead of just relying on a database index.
+</p>
+<h3>GridFS File Attributes</h3>
+<p>
+ <em>PutGridFS</em> allows for flowfile attributes that start with a configured prefix to be added to the GridFS
+ document. These can be very useful later when working with GridFS for providing metadata about a file.
+</p>
+<h3>Chunk Size</h3>
+<p>
+ GridFS splits up file into chunks within Mongo documents as the file is ingested into the database. The chunk size
+ configuration parameter configures the maximum size of each chunk. This field should be left at its default value
+ unless there is a specific business case to increase or decrease it.
+</p>
+<h3>Uniqueness Enforcement</h3>
+<p>
+ There are four operating modes:
+</p>
+<ul>
+ <li>No enforcement at the application level.</li>
+ <li>Enforce by unique file name.</li>
+ <li>Enforce by unique hash value.</li>
+ <li>Use both hash and file name.</li>
+</ul>
+<p>
+ The hash value by default is taken from the attribute <em>hash.value</em> which can be generated by configuring a
+ <em>HashContent</em> processor upstream of <em>PutGridFS</em>. Both this and the name option use a query on the existing
+ data to see if a file matching that criteria exists before attempting to write the flowfile contents.
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFSIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFSIT.java
new file mode 100644
index 0000000..e006ecb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFSIT.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.bson.types.ObjectId;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DeleteGridFSIT extends GridFSITTestBase {
+ private TestRunner runner;
+ private static final String BUCKET = "delete_test_bucket";
+
+ @Before
+ public void setup() throws Exception {
+ runner = TestRunners.newTestRunner(DeleteGridFS.class);
+ super.setup(runner, BUCKET, false);
+ }
+
+ @After
+ public void tearDown() {
+ super.tearDown();
+ }
+
+ @Test
+ public void testFileAndQueryAtSameTime() {
+ runner.setProperty(DeleteGridFS.FILE_NAME, "${test_var}");
+ runner.setProperty(DeleteGridFS.QUERY, "{}");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNeitherFileNorQuery() {
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testDeleteByFileName() {
+ testDeleteByProperty(DeleteGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()), setupTestFile());
+ }
+
+ @Test
+ public void testDeleteByQuery() {
+ testDeleteByProperty(DeleteGridFS.QUERY, "{}", setupTestFile());
+ }
+
+ @Test
+ public void testQueryAttribute() {
+ String attrName = "gridfs.query.used";
+ String fileName = setupTestFile();
+ runner.setProperty(DeleteGridFS.QUERY_ATTRIBUTE, attrName);
+ testDeleteByProperty(DeleteGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()), fileName);
+ testForQueryAttribute(fileName, attrName);
+ }
+
+ private void testForQueryAttribute(String mustContain, String attrName) {
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteGridFS.REL_SUCCESS);
+ String attribute = flowFiles.get(0).getAttribute(attrName);
+ Assert.assertTrue(attribute.contains(mustContain));
+ }
+
+ private String setupTestFile() {
+ String fileName = "simple-delete-test.txt";
+ ObjectId id = writeTestFile(fileName, "Hello, world!", BUCKET, new HashMap<>());
+ Assert.assertNotNull(id);
+
+ return fileName;
+ }
+
+ private void testDeleteByProperty(PropertyDescriptor descriptor, String value, String fileName) {
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.FILENAME.key(), fileName);
+ runner.setProperty(descriptor, value);
+ runner.assertValid();
+ runner.enqueue("test", attrs);
+ runner.run();
+
+ runner.assertTransferCount(DeleteGridFS.REL_FAILURE, 0);
+ runner.assertTransferCount(DeleteGridFS.REL_SUCCESS, 1);
+
+ Assert.assertFalse(String.format("File %s still exists.", fileName), fileExists(fileName, BUCKET));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFSIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFSIT.java
new file mode 100644
index 0000000..5ce4ff3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFSIT.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processors.mongodb.QueryHelper;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.bson.types.ObjectId;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FetchGridFSIT extends GridFSITTestBase {
+ TestRunner runner;
+
+ static final String BUCKET = "get_test_bucket";
+
+ @Before
+ public void setup() throws Exception {
+ runner = TestRunners.newTestRunner(FetchGridFS.class);
+ super.setup(runner, BUCKET, false);
+ }
+
+ @After
+ public void tearDown() {
+ super.tearDown();
+ }
+
+ @Test
+ public void testGetOneByName() {
+ final String fileName = "get_by_name.txt";
+ final String content = "Hello, world";
+ ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>());
+ Assert.assertNotNull(id);
+
+ String query = String.format("{\"filename\": \"%s\"}", fileName);
+ runner.enqueue(query);
+ runner.run();
+ runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
+ runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
+ runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS);
+ byte[] rawData = runner.getContentAsByteArray(flowFiles.get(0));
+ Assert.assertEquals("Data did not match for the file", new String(rawData), content);
+
+ runner.clearTransferState();
+ runner.setProperty(FetchGridFS.QUERY, query);
+ runner.enqueue("test");
+ runner.run();
+
+ runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
+ runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
+ runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
+ flowFiles = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS);
+ rawData = runner.getContentAsByteArray(flowFiles.get(0));
+ Assert.assertEquals("Data did not match for the file", new String(rawData), content);
+ }
+
+ @Test
+ public void testGetMany() {
+ String baseName = "test_file_%d.txt";
+ String content = "Hello, world take %d";
+ for (int index = 0; index < 5; index++) {
+ ObjectId id = writeTestFile(String.format(baseName, index), String.format(content, index), BUCKET, new HashMap<>());
+ Assert.assertNotNull(id);
+ }
+
+ AllowableValue[] values = new AllowableValue[] { QueryHelper.MODE_MANY_COMMITS, QueryHelper.MODE_ONE_COMMIT };
+
+ for (AllowableValue value : values) {
+ String query = "{}";
+ runner.setProperty(FetchGridFS.OPERATION_MODE, value);
+ runner.enqueue(query);
+ runner.run();
+
+ runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
+ runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
+ runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 5);
+ runner.clearTransferState();
+ }
+ }
+
+ @Test
+ public void testQueryAttribute() {
+ final String fileName = "get_by_name.txt";
+ final String content = "Hello, world";
+ ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>());
+ Assert.assertNotNull(id);
+
+ final String queryAttr = "gridfs.query.used";
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.FILENAME.key(), fileName);
+ runner.setProperty(FetchGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()));
+ runner.setProperty(FetchGridFS.QUERY_ATTRIBUTE, queryAttr);
+ runner.enqueue(content, attrs);
+ runner.run();
+
+ runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
+ runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
+ runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
+ MockFlowFile mff = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS).get(0);
+ String attr = mff.getAttribute(queryAttr);
+ Assert.assertNotNull("Query attribute was null.", attr);
+ Assert.assertTrue("Wrong content.", attr.contains("filename"));
+
+ runner.clearTransferState();
+
+ id = writeTestFile(fileName, content, BUCKET, new HashMap<String, Object>(){{
+ put("lookupKey", "xyz");
+ }});
+ Assert.assertNotNull(id);
+
+ String query = "{ \"metadata\": { \"lookupKey\": \"xyz\" }}";
+
+ runner.removeProperty(FetchGridFS.FILE_NAME);
+ runner.setProperty(FetchGridFS.QUERY, query);
+ runner.enqueue(content, attrs);
+ runner.run();
+ runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
+ runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
+ runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
+ mff = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS).get(0);
+ attr = mff.getAttribute(queryAttr);
+ Assert.assertNotNull("Query attribute was null.", attr);
+ Assert.assertTrue("Wrong content.", attr.contains("metadata"));
+ }
+
+ @Test
+ public void testGetQueryFromBody() {
+ runner.enqueue("{}");
+ testQueryFromSource(0, 1, 1);
+ }
+
+ @Test
+ public void testGetQueryFromQueryParam() {
+ runner.setProperty(FetchGridFS.QUERY, "{}");
+ runner.enqueue("");
+ testQueryFromSource(0, 1, 1);
+ }
+
+ @Test
+ public void testGetQueryFromFileNameParam() {
+ Map<String, String> attr = new HashMap<>();
+ attr.put(CoreAttributes.FILENAME.key(), "get_by_name.txt");
+ runner.setProperty(FetchGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()));
+ runner.enqueue("test", attr);
+ testQueryFromSource(0, 1, 1);
+ }
+
+ private void testQueryFromSource(int failure, int original, int success) {
+ final String fileName = "get_by_name.txt";
+ final String content = "Hello, world";
+ ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>());
+ Assert.assertNotNull(id);
+
+ runner.run();
+ runner.assertTransferCount(FetchGridFS.REL_FAILURE, failure);
+ runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, original);
+ runner.assertTransferCount(FetchGridFS.REL_SUCCESS, success);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/GridFSITTestBase.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/GridFSITTestBase.java
new file mode 100644
index 0000000..45e7cb2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/GridFSITTestBase.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.GridFSBuckets;
+import com.mongodb.client.gridfs.model.GridFSFile;
+import com.mongodb.client.gridfs.model.GridFSUploadOptions;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
+import org.apache.nifi.util.TestRunner;
+import org.bson.Document;
+import org.bson.types.ObjectId;
+
+import java.io.ByteArrayInputStream;
+import java.util.Map;
+
+public class GridFSITTestBase {
+ static final String URI = "mongodb://localhost:27017";
+ static final String DB = "gridfs_test_database";
+ MongoClient client;
+
+ public void setup(TestRunner runner, String bucketName) throws Exception {
+ setup(runner, bucketName, true);
+ }
+
+ public void setup(TestRunner runner, String bucketName, boolean validate) throws Exception {
+ MongoDBClientService clientService = new MongoDBControllerService();
+ runner.addControllerService("clientService", clientService);
+ runner.setProperty(AbstractGridFSProcessor.CLIENT_SERVICE, "clientService");
+ runner.setProperty(clientService, MongoDBControllerService.URI, URI);
+ runner.setProperty(AbstractGridFSProcessor.BUCKET_NAME, bucketName);
+ runner.setProperty(AbstractGridFSProcessor.DATABASE_NAME, DB);
+ runner.enableControllerService(clientService);
+ runner.setValidateExpressionUsage(true);
+ if (validate) {
+ runner.assertValid();
+ }
+
+ client = new MongoClient("localhost", 27017);
+ }
+ public void tearDown() {
+ client.dropDatabase(DB);
+ client.close();
+ }
+
+ public boolean fileExists(String name, String bucketName) {
+ GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
+ MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
+ boolean retVal = it.hasNext();
+ it.close();
+
+ return retVal;
+ }
+
+ public ObjectId writeTestFile(String fileName, String content, String bucketName, Map<String, Object> attrs) {
+ GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
+ GridFSUploadOptions options = new GridFSUploadOptions().metadata(new Document(attrs));
+ ByteArrayInputStream input = new ByteArrayInputStream(content.getBytes());
+ ObjectId retVal = bucket.uploadFromStream(fileName, input, options);
+
+ return retVal;
+ }
+
+ public boolean fileHasProperties(String name, String bucketName, Map<String, String> attrs) {
+ GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
+ MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
+ boolean retVal = false;
+
+ if (it.hasNext()) {
+ GridFSFile file = (GridFSFile)it.next();
+ Document metadata = file.getMetadata();
+ if (metadata != null && metadata.size() == attrs.size()) {
+ retVal = true;
+ for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ Object val = attrs.get(entry.getKey());
+ if (val == null || !entry.getValue().equals(val)) {
+ retVal = false;
+ break;
+ }
+ }
+ }
+ }
+
+ it.close();
+
+ return retVal;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFSIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFSIT.java
new file mode 100644
index 0000000..dfd7ae0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFSIT.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.client.MongoCollection;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.bson.Document;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PutGridFSIT extends GridFSITTestBase {
+ TestRunner runner;
+
+ static final String BUCKET = "put_test_bucket";
+
+ @Before
+ public void setup() throws Exception {
+ runner = TestRunners.newTestRunner(PutGridFS.class);
+ runner.setProperty(PutGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()));
+ super.setup(runner, BUCKET);
+ }
+
+ @After
+ public void tearDown() {
+ super.tearDown();
+ }
+
+ @Test
+ public void testSimplePut() {
+ final String fileName = "simple_test.txt";
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.FILENAME.key(), fileName);
+
+ runner.enqueue("12345", attrs);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutGridFS.REL_SUCCESS);
+
+ Assert.assertTrue("File does not exist", fileExists(fileName, BUCKET));
+ }
+
+ @Test
+ public void testWithProperties() {
+ final String fileName = "simple_test_props.txt";
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.FILENAME.key(), fileName);
+ attrs.put("prop.created_by", "john.smith");
+ attrs.put("prop.created_for", "jane.doe");
+ attrs.put("prop.restrictions", "PHI&PII");
+ attrs.put("prop.department", "Accounting");
+
+ runner.setProperty(PutGridFS.PROPERTIES_PREFIX, "prop");
+ runner.enqueue("12345", attrs);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutGridFS.REL_SUCCESS);
+
+ attrs = new HashMap<String, String>(){{
+ put("created_by", "john.smith");
+ put("created_for", "jane.doe");
+ put("restrictions", "PHI&PII");
+ put("department", "Accounting");
+ }};
+
+ Assert.assertTrue("File does not exist", fileExists(fileName, BUCKET));
+ Assert.assertTrue("File is missing PARENT_PROPERTIES", fileHasProperties(fileName, BUCKET, attrs));
+ }
+
+ @Test
+ public void testNoUniqueness() {
+ String fileName = "test_duplicates.txt";
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.FILENAME.key(), fileName);
+
+ for (int x = 0; x < 10; x++) {
+ runner.enqueue("Duplicates are ok.", attrs);
+ runner.run();
+ }
+
+ runner.assertTransferCount(PutGridFS.REL_SUCCESS, 10);
+
+ String bucketName = String.format("%s.files", BUCKET);
+ MongoCollection files = client.getDatabase(DB).getCollection(bucketName);
+ Document query = Document.parse(String.format("{\"filename\": \"%s\"}", fileName));
+ long count = files.count(query);
+ Assert.assertTrue("Wrong count", count == 10);
+ }
+
+ @Test
+ public void testFileNameUniqueness() {
+ String fileName = "test_duplicates.txt";
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.FILENAME.key(), fileName);
+ testUniqueness(attrs, "Hello, world", PutGridFS.UNIQUE_NAME);
+ }
+
+ @Test
+ public void testFileNameAndHashUniqueness() {
+ testHashUniqueness(PutGridFS.UNIQUE_BOTH);
+ }
+
+ @Test
+ public void testHashUniqueness() {
+ testHashUniqueness(PutGridFS.UNIQUE_HASH);
+ }
+
+
+ @Test
+ public void testChunkSize() {
+ String[] chunkSizes = new String[] { "128 KB", "256 KB", "384 KB", "512KB", "768KB", "1024 KB" };
+ StringBuilder sb = new StringBuilder();
+ for (int x = 0; x < 10000; x++) {
+ sb.append("This is a test string used to build up a largish text file.");
+ }
+ final String testData = sb.toString();
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.FILENAME.key(), "big-putgridfs-test-file.txt");
+
+ for (String chunkSize : chunkSizes) {
+ runner.setProperty(PutGridFS.CHUNK_SIZE, chunkSize);
+ runner.enqueue(testData, attrs);
+ runner.run();
+ runner.assertTransferCount(PutGridFS.REL_FAILURE, 0);
+ runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 0);
+ runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1);
+
+ runner.clearTransferState();
+ }
+
+ runner.setProperty(PutGridFS.CHUNK_SIZE, "${gridfs.chunk.size}");
+ attrs.put("gridfs.chunk.size", "768 KB");
+ runner.enqueue(testData, attrs);
+ runner.run();
+ runner.assertTransferCount(PutGridFS.REL_FAILURE, 0);
+ runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 0);
+ runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1);
+ }
+
+ private void testHashUniqueness(AllowableValue value) {
+ String hashAttr = "hash.value";
+ String fileName = "test_duplicates.txt";
+ String content = "Hello, world";
+ String hash = DigestUtils.md5Hex(content);
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.FILENAME.key(), fileName);
+ attrs.put(hashAttr, hash);
+ testUniqueness(attrs, content, value);
+ }
+
+ private void testUniqueness(Map<String, String> attrs, String content, AllowableValue param) {
+ runner.setProperty(PutGridFS.ENFORCE_UNIQUENESS, param);
+ for (int x = 0; x < 5; x++) {
+ runner.enqueue(content, attrs);
+ runner.run();
+ }
+
+ runner.assertTransferCount(PutGridFS.REL_FAILURE, 0);
+ runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 4);
+ runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
index c731fe4..6fdebf5 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
@@ -40,8 +40,8 @@ import java.util.List;
@Tags({"mongo", "mongodb", "service"})
@CapabilityDescription(
- "Provides a controller service that configures a connection to MongoDB and provides access to that connection to " +
- "other Mongo-related components."
+ "Provides a controller service that configures a connection to MongoDB and provides access to that connection to " +
+ "other Mongo-related components."
)
public class MongoDBControllerService extends AbstractControllerService implements MongoDBClientService {
private String uri;