You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/11/08 21:11:57 UTC

[GitHub] [nifi] turcsanyip commented on a change in pull request #5482: NIFI-9334 Add support for upsert in 'PutMongoRecord'.

turcsanyip commented on a change in pull request #5482:
URL: https://github.com/apache/nifi/pull/5482#discussion_r744988569



##########
File path: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -57,28 +70,77 @@
         "a configured MongoDB collection. This processor does not support updates, deletes or upserts. The number of documents to insert at a time is controlled " +

Review comment:
       Could you please update the documentation?
   It is not correct any more: _"This processor does not support updates, deletes or upserts."_

##########
File path: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -190,4 +303,49 @@ private List convertArrays(Object[] input) {
 
         return retVal;
     }
+
+    private Bson[] buildFilters(Map<String, List<String>> updateKeyFieldPathToFieldChain, Document readyToUpsert) {
+        Bson[] filters = updateKeyFieldPathToFieldChain.entrySet()
+            .stream()
+            .map(updateKeyFieldPath__fieldChain -> {
+                String fieldPath = updateKeyFieldPath__fieldChain.getKey();
+                List<String> fieldChain = updateKeyFieldPath__fieldChain.getValue();
+
+                Object value = readyToUpsert;
+                String previousField = null;
+                for (String field : fieldChain) {
+                    if (!(value instanceof Map)) {
+                        throw new ProcessException("field '" + previousField + "' (from field expression '" + fieldPath + "') is not an embedded document");
+                    }
+
+                    value = ((Map) value).get(field);
+
+                    if (value == null) {
+                        throw new ProcessException("field '" + field + "' (from field expression '" + fieldPath + "') has no value");
+                    }
+
+                    previousField = field;
+                }
+
+                Bson filter = Filters.eq(fieldPath, value);
+                return filter;
+            })
+            .collect(Collectors.toList())
+            .toArray(new Bson[0]);
+
+        return filters;
+    }
+
+    private boolean updateModeIs(String updateValueToMatch, ProcessContext context, FlowFile flowFile) {

Review comment:
       `updateModeToMatch` ?

##########
File path: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -190,4 +303,49 @@ private List convertArrays(Object[] input) {
 
         return retVal;
     }
+
+    private Bson[] buildFilters(Map<String, List<String>> updateKeyFieldPathToFieldChain, Document readyToUpsert) {
+        Bson[] filters = updateKeyFieldPathToFieldChain.entrySet()
+            .stream()
+            .map(updateKeyFieldPath__fieldChain -> {
+                String fieldPath = updateKeyFieldPath__fieldChain.getKey();
+                List<String> fieldChain = updateKeyFieldPath__fieldChain.getValue();
+
+                Object value = readyToUpsert;
+                String previousField = null;
+                for (String field : fieldChain) {
+                    if (!(value instanceof Map)) {
+                        throw new ProcessException("field '" + previousField + "' (from field expression '" + fieldPath + "') is not an embedded document");
+                    }
+
+                    value = ((Map) value).get(field);
+
+                    if (value == null) {
+                        throw new ProcessException("field '" + field + "' (from field expression '" + fieldPath + "') has no value");
+                    }
+
+                    previousField = field;
+                }
+
+                Bson filter = Filters.eq(fieldPath, value);
+                return filter;
+            })
+            .collect(Collectors.toList())
+            .toArray(new Bson[0]);

Review comment:
       `.toArray(Bson[]::new)` could be used instead of collect + toArray.

##########
File path: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -135,15 +222,41 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 for (String name : schema.getFieldNames()) {
                     document.put(name, contentMap.get(name));
                 }
-                inserts.add(convertArrays(document));
-                if (inserts.size() == ceiling) {
-                    collection.insertMany(inserts);
-                    added += inserts.size();
-                    inserts = new ArrayList<>();
+                Document readyToUpsert = convertArrays(document);
+
+                WriteModel<Document> writeModel;
+                if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
+                    Bson[] filters = buildFilters(updateKeyFieldPathToFieldChain, readyToUpsert);
+
+                    if (updateModeIs(UPDATE_ONE.getValue(), context, flowFile)) {
+                        writeModel = new UpdateOneModel<>(
+                            Filters.and(filters),
+                            new Document("$set", readyToUpsert),
+                            new UpdateOptions().upsert(true)
+                        );
+                    } else if (updateModeIs(UPDATE_MANY.getValue(), context, flowFile)) {
+                        writeModel = new UpdateManyModel<>(
+                            Filters.and(filters),
+                            new Document("$set", readyToUpsert),
+                            new UpdateOptions().upsert(true)
+                        );
+                    } else {
+                        String flowfileUpdateMode = flowFile.getAttribute("mongo.update.mode");
+                        throw new ProcessException("Unrecognized 'mongo.update.mode' value '" + flowfileUpdateMode + "'");

Review comment:
       Unhandled `ProcessException` simply pushes back the FlowFile to the input queue. It should be sent to `Failure` in this case.
   Also in `buildFilters()` method.

##########
File path: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -190,4 +303,49 @@ private List convertArrays(Object[] input) {
 
         return retVal;
     }
+
+    private Bson[] buildFilters(Map<String, List<String>> updateKeyFieldPathToFieldChain, Document readyToUpsert) {
+        Bson[] filters = updateKeyFieldPathToFieldChain.entrySet()
+            .stream()
+            .map(updateKeyFieldPath__fieldChain -> {
+                String fieldPath = updateKeyFieldPath__fieldChain.getKey();
+                List<String> fieldChain = updateKeyFieldPath__fieldChain.getValue();
+
+                Object value = readyToUpsert;
+                String previousField = null;
+                for (String field : fieldChain) {
+                    if (!(value instanceof Map)) {
+                        throw new ProcessException("field '" + previousField + "' (from field expression '" + fieldPath + "') is not an embedded document");
+                    }
+
+                    value = ((Map) value).get(field);
+
+                    if (value == null) {
+                        throw new ProcessException("field '" + field + "' (from field expression '" + fieldPath + "') has no value");
+                    }
+
+                    previousField = field;
+                }
+
+                Bson filter = Filters.eq(fieldPath, value);
+                return filter;
+            })
+            .collect(Collectors.toList())
+            .toArray(new Bson[0]);
+
+        return filters;
+    }
+
+    private boolean updateModeIs(String updateValueToMatch, ProcessContext context, FlowFile flowFile) {
+        String updateMode = context.getProperty(UPDATE_MODE).getValue();
+
+        boolean updateMadeMatches = updateMode.equals(updateValueToMatch)

Review comment:
       Typo: `updateModeMatches`

##########
File path: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -118,15 +184,36 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
 
         final WriteConcern writeConcern = getWriteConcern(context);
 
-        List<Document> inserts = new ArrayList<>();
         int ceiling = context.getProperty(INSERT_COUNT).asInteger();
-        int added   = 0;
+        int written = 0;
         boolean error = false;
 
-        try (final InputStream inStream = session.read(flowFile);
-             final RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) {
-            final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern);
+        boolean ordered = context.getProperty(ORDERED).asBoolean();
+        boolean bypass = context.getProperty(BYPASS_VALIDATION).asBoolean();
+
+        Map<String, List<String>> updateKeyFieldPathToFieldChain = new LinkedHashMap<>();
+        if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
+            Arrays.stream(context.getProperty(UPDATE_KEY_FIELDS).getValue().split("\\s*,\\s*"))
+                .forEach(updateKeyField -> updateKeyFieldPathToFieldChain.put(
+                    updateKeyField,
+                    Arrays.asList(updateKeyField.split("\\."))
+                ));
+        }

Review comment:
       This `updateKeyFieldPathToFieldChain` map is only used in `buildFilters()`.
   The raw property value could be passed in and `fieldChain` could simply be calculated there which would result in a more readable code:
   ```
       private Bson[] buildFilters(String updateKeyFields, Document readyToUpsert) {
           Bson[] filters = Arrays.stream(updateKeyFields.split("\\s*,\\s*"))
                   .map(fieldPath -> {
                       String[] fieldChain = fieldPath.split("\\.");
   
                       Object value = readyToUpsert;
                       ...
   ```
   

##########
File path: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -57,28 +70,77 @@
         "a configured MongoDB collection. This processor does not support updates, deletes or upserts. The number of documents to insert at a time is controlled " +
         "by the \"Insert Batch Size\" configuration property. This value should be set to a reasonable size to ensure " +
         "that MongoDB is not overloaded with too many inserts at once.")
+@ReadsAttribute(
+    attribute = "mongodb.update.mode",
+    description = "Configurable parameter for controlling update mode on a per-flowfile basis." +
+        " Acceptable values are 'one' and 'many' and controls whether a single incoming record should update a single or multiple Mongo documents."
+)
 public class PutMongoRecord extends AbstractMongoProcessor {
     static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
             .description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
     static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
             .description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
 
+    static final AllowableValue UPDATE_ONE = new AllowableValue("one", "Update One", "Updates only the first document that matches the query.");
+    static final AllowableValue UPDATE_MANY = new AllowableValue("many", "Update Many", "Updates every document that matches the query.");
+    static final AllowableValue UPDATE_FF_ATTRIBUTE = new AllowableValue("flowfile-attribute", "Use 'mongodb.update.mode' flowfile attribute.",
+        "Use the value of the 'mongodb.update.mode' attribute of the incoming flowfile. Acceptable values are 'one' and 'many'.");
+
     static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
             .name("record-reader")
             .displayName("Record Reader")
             .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
             .identifiesControllerService(RecordReaderFactory.class)
             .required(true)
             .build();
+
     static final PropertyDescriptor INSERT_COUNT = new PropertyDescriptor.Builder()
             .name("insert_count")
-            .displayName("Insert Batch Size")
-            .description("The number of records to group together for one single insert operation against MongoDB.")
+            .displayName("Batch Size")
+            .description("The number of records to group together for one single insert/upsert operation against MongoDB.")
             .defaultValue("100")
             .required(true)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("ordered")
+            .displayName("Ordered")
+            .description("Perform ordered or unordered operations")
+            .allowableValues("True", "False")
+            .defaultValue("False")
+            .required(true)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BYPASS_VALIDATION = new PropertyDescriptor.Builder()
+            .name("bypass-validation")
+            .displayName("Bypass Validation")
+            .description("Bypass schema validation during insert/update")
+            .allowableValues("True", "False")
+            .defaultValue("True")
+            .required(true)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor UPDATE_KEY_FIELDS = new PropertyDescriptor.Builder()
+            .name("update-key-fields")
+            .displayName("Update Key Fields")
+            .description("Comma separated list of fields that uniquely identifies a document. If this property is set NiFi will attempt an upsert operation on all documents." +

Review comment:
       "uniquely" is not correct. `Update Many` mode would not make sense in that case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org