You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by MikeThomsen <gi...@git.apache.org> on 2017/09/27 16:19:32 UTC

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

GitHub user MikeThomsen opened a pull request:

    https://github.com/apache/nifi/pull/2180

    Added GetMongoAggregation to support running Mongo aggregations.

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/MikeThomsen/nifi NIFI-4429

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2180
    
----
commit b363a2d10daa49e9e3b2a4196dcb73c15ae51753
Author: Mike Thomsen <mi...@gmail.com>
Date:   2017-09-27T15:46:35Z

    Added GetMongoAggregation to support running Mongo aggregations.

----


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mattyb149 I think all of your changes are in now. I also updated GetMongo to harmonize some of the changes between it and RunMongoAggregation


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    I will review this when I finish reviewing PR 2085. 


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r159282874
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoAggregation.java ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query at user-defined intervals.")
    +public class GetMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    --- End diff --
    
    Getting an error here that Jackson is not available (I didn't see the jackson-*-asl stuff in the POMs). Can you use the com.mongodb.util stuff like JSON.parse()? If not, then I think you get Jackson Databind `com.fasterxml.jackson.databind.ObjectMapper` transitively from nifi-processor-utils. If you need to use the one from the package you have above, then you'll need to add the appropriate dependencies (don't forget to update the License & Notice, I think only the latter applies IIRC), and probably exclude the Databind stuff.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r163883832
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -95,15 +102,54 @@
     
         public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
                 .name("Write Concern")
    +            .displayName("Write Concern")
                 .description("The write concern to use")
                 .required(true)
                 .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
                         WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .displayName("Batch Size")
    +            .description("The number of elements returned from the server in one batch")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    --- End diff --
    
    Should this allow 0 to fetch all batches? Or maybe since it is not required, the description can say that if this property is not specified, then all elements will be returned.


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mattyb149 you be in a position to review this one soon by chance?


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @alopresto got time to do that review?


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    It looks like you and @mattyb149 have had very productive conversations around this. I'm a bit out of the loop on it but if Matt can't get to it by tomorrow, I'll do a final review and merge it if it looks good. 


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    +1 LGTM, ran unit tests and a number of scenarios on a live NiFi instance, verified the expected behavior with and without incoming flow files, loop connections, success/original/failure relationships, etc.  Thanks for this addition! Merging to master


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166078811
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -221,4 +264,16 @@ protected WriteConcern getWriteConcern(final ProcessContext context) {
             }
             return writeConcern;
         }
    +
    +    protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, Map extraAttributes, Relationship rel) throws UnsupportedEncodingException {
    +        String charset = parent != null ? context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue()
    +                : context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
    +
    +        FlowFile flowFile = session.create(parent);
    --- End diff --
    
    I just threw a ternary operator in there.


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    Sorry I haven’t had time to close the loop on this. @alopresto if the suggested changes are in and you can run some tests successfully, then I’m good, thanks!


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166141568
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    +            .required(true)
    +            .addValidator(AGG_VALIDATOR)
    +            .build();
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(CHARSET);
    +        _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(QUERY_ATTRIBUTE);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_RESULTS);
    +        _relationships.add(REL_ORIGINAL);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    static String buildBatch(List batch) {
    +        ObjectMapper mapper = new ObjectMapper();
    +        String retVal;
    +        try {
    +            retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
    +        } catch (Exception e) {
    +            retVal = null;
    +        }
    +
    +        return retVal;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    --- End diff --
    
    I assume you mean this:
    
    ```
    FlowFile fileToProcess = null;
    if (context.hasIncomingConnection()) {
        fileToProcess = session.get();
    
        if (fileToProcess == null) {
            // Incoming connection with no flow file available, do no work (see capability description)
            return;
        }
    }
    ```
    
    Seems easy enough.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r159286920
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoAggregationTest.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.MongoClient;
    +import com.mongodb.MongoClientURI;
    +import com.mongodb.client.MongoCollection;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.bson.Document;
    +import org.codehaus.jackson.map.ObjectMapper;
    --- End diff --
    
    See comment above about switching to `com.fasterxml.jackson.databind.ObjectMapper` or using Mongo utils 


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r163884629
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -95,15 +100,36 @@
     
         public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
                 .name("Write Concern")
    +            .displayName("Write Concern")
                 .description("The write concern to use")
                 .required(true)
                 .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
                         WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .displayName("Batch Size")
    +            .description("The number of elements returned from the server in one batch")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
         static List<PropertyDescriptor> descriptors = new ArrayList<>();
     
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
    --- End diff --
    
    What do you think about the above comment? The description is a bit confusing in the context of RunMongoAggregation.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160451826
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -221,4 +247,12 @@ protected WriteConcern getWriteConcern(final ProcessContext context) {
             }
             return writeConcern;
         }
    +
    +    protected void writeBatch(String payload, ProcessContext context, ProcessSession session) throws UnsupportedEncodingException {
    +        FlowFile flowFile = session.create();
    +        flowFile = session.importFrom(new ByteArrayInputStream(payload.getBytes("UTF-8")), flowFile);
    --- End diff --
    
    Is the Mongo output always UTF-8? If not you may need a Character Set property (which you can borrow from any number of existing processors).


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r165994717
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -95,13 +101,50 @@
     
         public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
                 .name("Write Concern")
    +            .displayName("Write Concern")
                 .description("The write concern to use")
                 .required(true)
                 .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
                         WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .displayName("Batch Size")
    +            .description("The number of elements returned from the server in one batch.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query-attribute")
    --- End diff --
    
    Since this is common to Mongo processors, the name should probably reflect that.


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    There's a conflict now, can you rebase? I will try to take a look as soon as I can afterwards...


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r163887649
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java ---
    @@ -135,15 +135,15 @@ public void testValidators() {
             // invalid projection
             runner.setVariable("projection", "{a: x,y,z}");
             runner.setProperty(GetMongo.QUERY, "{a: 1}");
    -        runner.setProperty(GetMongo.PROJECTION, "${projection}");
    +        runner.setProperty(GetMongo.PROJECTION, "{a: z}");
    --- End diff --
    
    I can't immediately tell the reason for this change, since the projection is changed from x,y,z to z. Is this on purpose?


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mattyb149 We should be good to go now. I just checked in a change that addresses the few minor points left over.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r161004084
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---
    @@ -268,32 +248,29 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                                     String payload = buildBatch(batch, jsonTypeSetting);
                                     writeBatch(payload, context, session);
                                     batch = new ArrayList<>();
    -                            } catch (IOException ex) {
    +                            } catch (Exception ex) {
                                     getLogger().error("Error building batch", ex);
                                 }
                             }
                         }
                         if (batch.size() > 0) {
                             try {
                                 writeBatch(buildBatch(batch, jsonTypeSetting), context, session);
    -                        } catch (IOException ex) {
    +                        } catch (Exception ex) {
                                 getLogger().error("Error sending remainder of batch", ex);
                             }
                         }
                     } else {
                         while (cursor.hasNext()) {
                             flowFile = session.create();
    -                        flowFile = session.write(flowFile, new OutputStreamCallback() {
    -                            @Override
    -                            public void process(OutputStream out) throws IOException {
    -                                String json;
    -                                if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
    -                                    json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next());
    -                                } else {
    -                                    json = cursor.next().toJson();
    -                                }
    -                                IOUtils.write(json, out);
    +                        flowFile = session.write(flowFile, out -> {
    +                            String json;
    +                            if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
    +                                json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next());
    +                            } else {
    +                                json = cursor.next().toJson();
                                 }
    +                            IOUtils.write(json, out);
    --- End diff --
    
    No, but since you're writing to a flow file at this point, you don't need to use the Mongo API. I was just wondering about files in Mongo with various character sets, so for example if you had a document in Mongo with a string field containing Japanese characters, etc.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r159285323
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoAggregation.java ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query at user-defined intervals.")
    +public class GetMongoAggregation extends AbstractMongoProcessor {
    --- End diff --
    
    Without an InputRequirement annotation here, it defaults to INPUT_ALLOWED, but you are not using any flow file(s) from the session. If you want to prohibit input, please add a InputRequirement(INPUT_FORBIDDEN) annotation to the class; otherwise, if you want to use incoming flow files (perhaps as a trigger or for flow file attributes for the query, for example), you might consider changing the name (so as not to confuse the behavior with other Get processors) and checking for an available flow file. Either is fine with me, unless aggregation queries might be plentiful and different, then I'd tend toward the input-based approach. If one aggregate query per instance is the intended use case, then I'm fine with the input-forbidden approach :)


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    Yes will review shortly


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @MikeThomsen just as a headsup we've not forgotten about this.  Mattyb picked up the flu thing that seems to be running wild.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166111208
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -95,13 +101,50 @@
     
         public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
                 .name("Write Concern")
    +            .displayName("Write Concern")
                 .description("The write concern to use")
                 .required(true)
                 .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
                         WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .displayName("Batch Size")
    +            .description("The number of elements returned from the server in one batch.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query-attribute")
    --- End diff --
    
    Done.


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mattyb149 @markap14 @milanchandna Do any of you have some time to do a quick look to see if this can get merged?


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r169402684
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.RunMongoAggregation/additionalDetails.html ---
    @@ -0,0 +1,44 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements.  See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License.  You may obtain a copy of the License at
    +      http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<head>
    +    <meta charset="utf-8" />
    +    <title>GetKafka</title>
    --- End diff --
    
    Done


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @joewitt I had it two weeks ago, so I understand.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r163883982
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -95,15 +102,54 @@
     
         public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
                 .name("Write Concern")
    +            .displayName("Write Concern")
                 .description("The write concern to use")
                 .required(true)
                 .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
                         WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .displayName("Batch Size")
    +            .description("The number of elements returned from the server in one batch")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query-attribute")
    +            .displayName("Query Output Attribute")
    +            .description("If set, the query will be written to a specified attribute on the output flowfiles.")
    +            .expressionLanguageSupported(true)
    +            .addValidator(Validator.VALID)
    --- End diff --
    
    Since this has to be a valid attribute name, I recommend changing this to `StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR`
    
    Also, since you may be writing this attribute in GetMongo and RunMongoAggregation, there should be a WritesAttribute annotation on both processors (same goes for mime.type or any other attribute that is written)


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r142458671
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -102,8 +107,26 @@
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    --- End diff --
    
    I know, but I didn't write the class and it's been around for a while. I wouldn't want to risk breaking others' flows over that.


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mbolka I think the PMC are backlogged. If you really need this, try asking someone on the dev mailing list to jump in on it. I'll have time in the next day or two to make any updates they request in order to get it merged.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160449815
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query at user-defined intervals.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when it succeeds.")
    +            .name("Original")
    +            .build();
    +
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    +            .required(true)
    +            .addValidator(AGG_VALIDATOR)
    +            .build();
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_ORIGINAL);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    static String buildBatch(List batch) {
    +        ObjectMapper mapper = new ObjectMapper();
    +        String retVal;
    +        try {
    +            retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
    +        } catch (Exception e) {
    +            retVal = null;
    +        }
    +
    +        return retVal;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +
    +        String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
    +        Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
    +
    +        MongoCollection collection = getCollection(context);
    +        MongoCursor iter = null;
    +
    +        try {
    +            List<Bson> aggQuery = buildAggregationQuery(query);
    +            AggregateIterable it = collection.aggregate(aggQuery);
    +            it.batchSize(batchSize != null ? batchSize : 1);
    +
    +            iter = it.iterator();
    +            List batch = new ArrayList();
    +
    +            while (iter.hasNext()) {
    +                batch.add(iter.next());
    +                if (batch.size() == resultsPerFlowfile) {
    +                    writeBatch(buildBatch(batch), context, session);
    +                    batch = new ArrayList();
    +                }
    +            }
    +
    +            if (batch.size() > 0) {
    +                writeBatch(buildBatch(batch), context, session);
    +            }
    +
    +            session.transfer(flowFile, REL_ORIGINAL);
    --- End diff --
    
    Does this fail if flowFile is null, or is it a no-op? In the former case, you can wrap it in a null check.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160452108
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---
    @@ -268,32 +248,29 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                                     String payload = buildBatch(batch, jsonTypeSetting);
                                     writeBatch(payload, context, session);
                                     batch = new ArrayList<>();
    -                            } catch (IOException ex) {
    +                            } catch (Exception ex) {
                                     getLogger().error("Error building batch", ex);
                                 }
                             }
                         }
                         if (batch.size() > 0) {
                             try {
                                 writeBatch(buildBatch(batch, jsonTypeSetting), context, session);
    -                        } catch (IOException ex) {
    +                        } catch (Exception ex) {
                                 getLogger().error("Error sending remainder of batch", ex);
                             }
                         }
                     } else {
                         while (cursor.hasNext()) {
                             flowFile = session.create();
    -                        flowFile = session.write(flowFile, new OutputStreamCallback() {
    -                            @Override
    -                            public void process(OutputStream out) throws IOException {
    -                                String json;
    -                                if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
    -                                    json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next());
    -                                } else {
    -                                    json = cursor.next().toJson();
    -                                }
    -                                IOUtils.write(json, out);
    +                        flowFile = session.write(flowFile, out -> {
    +                            String json;
    +                            if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
    +                                json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next());
    +                            } else {
    +                                json = cursor.next().toJson();
                                 }
    +                            IOUtils.write(json, out);
    --- End diff --
    
    Similar comment about character sets here, will a user want the flow file in a different encoding? If so you may need that Character Set property here instead.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r163884942
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -221,4 +267,16 @@ protected WriteConcern getWriteConcern(final ProcessContext context) {
             }
             return writeConcern;
         }
    +
    +    protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, Map extraAttributes, Relationship rel) throws UnsupportedEncodingException {
    +        String charset = parent != null ? context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue()
    +                : context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
    +
    +        FlowFile flowFile = session.create(parent);
    +        flowFile = session.importFrom(new ByteArrayInputStream(payload.getBytes(charset)), flowFile);
    +        flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
    --- End diff --
    
    THANK YOU for including this :) so many processors that output a specific format (even converters!) do not set the mime.type attribute, which makes viewing them from the UI a pain.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r142912872
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -221,4 +244,17 @@ protected WriteConcern getWriteConcern(final ProcessContext context) {
             }
             return writeConcern;
         }
    +
    +    protected void writeBatch(String payload, ProcessContext context, ProcessSession session) {
    +        FlowFile flowFile = session.create();
    +        flowFile = session.write(flowFile, new OutputStreamCallback() {
    +            @Override
    +            public void process(OutputStream out) throws IOException {
    +                out.write(payload.getBytes("UTF-8"));
    --- End diff --
    
    That's a good catch. I made that change.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2180


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mattyb149 @markap14 @milanchandna Did some cleanup and rebased against 1.5.0-SNAPSHOT.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166075574
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    --- End diff --
    
    That's a fair point since the description doesn't allow pretty printing. However it would make a good section in an additionalDetails.html for the overall doc, and you could refer to the Additional Details section here in the description.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by milanchandna <gi...@git.apache.org>.
Github user milanchandna commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r142060366
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -102,8 +107,26 @@
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    --- End diff --
    
    Generally it's preferred to add a displayname and make name without whitespaces, like "batch-size"


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160447460
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -95,15 +100,36 @@
     
         public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
                 .name("Write Concern")
    +            .displayName("Write Concern")
                 .description("The write concern to use")
                 .required(true)
                 .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
                         WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .displayName("Batch Size")
    +            .description("The number of elements returned from the server in one batch")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
         static List<PropertyDescriptor> descriptors = new ArrayList<>();
     
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
    --- End diff --
    
    You may not want this in the Abstract class as the description may change based on who is using it (see my other comments). In this case, the stock text description is common to many Get processors because if any flow files are produced, they are routed to success. In RunMongoAggregation, if there is an incoming flow file, then it is not routed to success; rather the results of a successful aggregation are written to success, and the original is written to "original", and on failure, the incoming flow file should be transferred to failure.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by milanchandna <gi...@git.apache.org>.
Github user milanchandna commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r142060410
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -102,8 +107,26 @@
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("The number of elements returned from the server in one batch")
    +            .required(false)
    --- End diff --
    
    Do you want to add default value as 1, can get rid of null check in onTrigger method by doing this.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r169360501
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.RunMongoAggregation/additionalDetails.html ---
    @@ -0,0 +1,44 @@
    +<!DOCTYPE html>
    +<html lang="en">
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements.  See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License.  You may obtain a copy of the License at
    +      http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<head>
    +    <meta charset="utf-8" />
    +    <title>GetKafka</title>
    --- End diff --
    
    Should be RunMongoAggregation, not that you'd ever really see the title of this page. I'll update on merge


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by milanchandna <gi...@git.apache.org>.
Github user milanchandna commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r142060479
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoAggregation.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query at user-defined intervals.")
    +public class GetMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
    --- End diff --
    
    Can REL_SUCCESS be used from AbstractMongoProcessor


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    Ok Monday I will ;)


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mattyb149 Any chance we can get this merged?


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166073173
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    --- End diff --
    
    I'm not sure that the example would be that readable with the way the description fields are formatted. This is a really minimal example if you've never seen one:
    ```
    [{
       "$project": {
            "domain": 1
        },
        "$group": {
             "_id": { "domain": "$domain" },
             "total": {
                 "$sum": 1
             }
         }
    }]
    ```


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by milanchandna <gi...@git.apache.org>.
Github user milanchandna commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r142060476
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -221,4 +244,17 @@ protected WriteConcern getWriteConcern(final ProcessContext context) {
             }
             return writeConcern;
         }
    +
    +    protected void writeBatch(String payload, ProcessContext context, ProcessSession session) {
    +        FlowFile flowFile = session.create();
    +        flowFile = session.write(flowFile, new OutputStreamCallback() {
    +            @Override
    +            public void process(OutputStream out) throws IOException {
    +                out.write(payload.getBytes("UTF-8"));
    --- End diff --
    
    session.importFrom method can save you from callback. String payload can be converted to byte Input Stream and provided to importFrom method.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166003849
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    +            .required(true)
    +            .addValidator(AGG_VALIDATOR)
    +            .build();
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(CHARSET);
    +        _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(QUERY_ATTRIBUTE);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_RESULTS);
    +        _relationships.add(REL_ORIGINAL);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    static String buildBatch(List batch) {
    +        ObjectMapper mapper = new ObjectMapper();
    +        String retVal;
    +        try {
    +            retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
    +        } catch (Exception e) {
    +            retVal = null;
    +        }
    +
    +        return retVal;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +
    +        String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
    +        String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
    +        Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
    +
    +        Map attrs = new HashMap();
    +        if (queryAttr != null && queryAttr.trim().length() > 0) {
    +            attrs.put(queryAttr, query);
    +        }
    +
    +        MongoCollection collection = getCollection(context);
    +        MongoCursor iter = null;
    +
    +        try {
    +            List<Bson> aggQuery = buildAggregationQuery(query);
    +            AggregateIterable it = collection.aggregate(aggQuery);
    +            it.batchSize(batchSize != null ? batchSize : 1);
    +
    +            iter = it.iterator();
    +            List batch = new ArrayList();
    +
    +            while (iter.hasNext()) {
    +                batch.add(iter.next());
    +                if (batch.size() == resultsPerFlowfile) {
    +                    writeBatch(buildBatch(batch), null, context, session, attrs, REL_RESULTS);
    +                    batch = new ArrayList();
    +                }
    +            }
    +
    +            if (batch.size() > 0) {
    +                writeBatch(buildBatch(batch), null, context, session, attrs, REL_RESULTS);
    --- End diff --
    
    Same comment as above for hardcoding null as the parent FF reference


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160449596
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query at user-defined intervals.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when it succeeds.")
    +            .name("Original")
    +            .build();
    +
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    +            .required(true)
    +            .addValidator(AGG_VALIDATOR)
    +            .build();
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_ORIGINAL);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    static String buildBatch(List batch) {
    +        ObjectMapper mapper = new ObjectMapper();
    +        String retVal;
    +        try {
    +            retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
    +        } catch (Exception e) {
    +            retVal = null;
    +        }
    +
    +        return retVal;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +
    +        String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
    +        Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
    +
    +        MongoCollection collection = getCollection(context);
    +        MongoCursor iter = null;
    +
    +        try {
    +            List<Bson> aggQuery = buildAggregationQuery(query);
    +            AggregateIterable it = collection.aggregate(aggQuery);
    +            it.batchSize(batchSize != null ? batchSize : 1);
    +
    +            iter = it.iterator();
    +            List batch = new ArrayList();
    +
    +            while (iter.hasNext()) {
    +                batch.add(iter.next());
    +                if (batch.size() == resultsPerFlowfile) {
    +                    writeBatch(buildBatch(batch), context, session);
    +                    batch = new ArrayList();
    +                }
    +            }
    +
    +            if (batch.size() > 0) {
    +                writeBatch(buildBatch(batch), context, session);
    +            }
    +
    +            session.transfer(flowFile, REL_ORIGINAL);
    +        } catch (IOException i) {
    +            context.yield();
    --- End diff --
    
    If there is an input flow file, an error should be logged, and the flow file transferred to a different relationship, rather than rollback. Depending on the kinds of errors that can occur, you may want a "retry" relationship (along with possible penalization) if the same flow file might succeed later (due to connection issues, e.g.) or "failure" if the flow file will always fail due to bad data, format, etc.
    
    If there is no input flow file, a yield and rollback should be ok I think.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r165994803
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -95,13 +101,50 @@
     
         public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
                 .name("Write Concern")
    +            .displayName("Write Concern")
                 .description("The write concern to use")
                 .required(true)
                 .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
                         WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .displayName("Batch Size")
    +            .description("The number of elements returned from the server in one batch.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query-attribute")
    +            .displayName("Query Output Attribute")
    +            .description("If set, the query will be written to a specified attribute on the output flowfiles.")
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
    +            .required(false)
    +            .build();
    +    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("el5-charset")
    --- End diff --
    
    Looks like a copy-paste name, perhaps add a mongo prefix here?


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166111166
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---
    @@ -152,21 +146,25 @@ public ValidationResult validate(final String subject, final String value, final
                 .displayName("JSON Type")
                 .name("json-type")
                 .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" +
    -            " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " +
    -            " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.")
    +                    " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " +
    +                    " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.")
                 .expressionLanguageSupported(false)
                 .required(true)
                 .build();
     
         private final static Set<Relationship> relationships;
         private final static List<PropertyDescriptor> propertyDescriptors;
     
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
    +
         static {
             List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
             _propertyDescriptors.addAll(descriptors);
             _propertyDescriptors.add(JSON_TYPE);
             _propertyDescriptors.add(USE_PRETTY_PRINTING);
    +        _propertyDescriptors.add(CHARSET);
             _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(QUERY_ATTRIBUTE);
    --- End diff --
    
    Added it.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by alopresto <gi...@git.apache.org>.
Github user alopresto commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r142480211
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -102,8 +107,26 @@
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    --- End diff --
    
    In this case, the guidance for a new `PropertyDescriptor` in an existing class (or new class) is to add a `displayName` value and provide a "machine-readable" (i.e. no space) value for `name`. When modifying an *existing* `PropertyDescriptor`, adding a `displayName` value is encouraged, but modifying the `name` will break backward compatibility with existing flows. 


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mattyb149 If you get a chance, could you take a look? I just rebased it against master.


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by milanchandna <gi...@git.apache.org>.
Github user milanchandna commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    Yes I reviewed, changes looks good. But I am myself a fresh contributor so IMO you should wait for expert review to get this merged.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166001750
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -221,4 +264,16 @@ protected WriteConcern getWriteConcern(final ProcessContext context) {
             }
             return writeConcern;
         }
    +
    +    protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, Map extraAttributes, Relationship rel) throws UnsupportedEncodingException {
    +        String charset = parent != null ? context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue()
    +                : context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
    +
    +        FlowFile flowFile = session.create(parent);
    --- End diff --
    
    Need to check if parent is null before using it in the create. A unit test where there is no incoming flow file will illustrate this.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166079130
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    +            .required(true)
    +            .addValidator(AGG_VALIDATOR)
    +            .build();
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(CHARSET);
    +        _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(QUERY_ATTRIBUTE);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_RESULTS);
    +        _relationships.add(REL_ORIGINAL);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    static String buildBatch(List batch) {
    +        ObjectMapper mapper = new ObjectMapper();
    +        String retVal;
    +        try {
    +            retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
    +        } catch (Exception e) {
    +            retVal = null;
    +        }
    +
    +        return retVal;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +
    +        String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
    +        String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
    +        Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
    +
    +        Map attrs = new HashMap();
    +        if (queryAttr != null && queryAttr.trim().length() > 0) {
    +            attrs.put(queryAttr, query);
    +        }
    +
    +        MongoCollection collection = getCollection(context);
    +        MongoCursor iter = null;
    +
    +        try {
    +            List<Bson> aggQuery = buildAggregationQuery(query);
    +            AggregateIterable it = collection.aggregate(aggQuery);
    +            it.batchSize(batchSize != null ? batchSize : 1);
    +
    +            iter = it.iterator();
    +            List batch = new ArrayList();
    +
    +            while (iter.hasNext()) {
    +                batch.add(iter.next());
    +                if (batch.size() == resultsPerFlowfile) {
    +                    writeBatch(buildBatch(batch), null, context, session, attrs, REL_RESULTS);
    --- End diff --
    
    That might have been a mistake on my part that came from doing the rebase against a recent change to GetMongo.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r142563421
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoAggregation.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import org.codehaus.jackson.map.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query at user-defined intervals.")
    +public class GetMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
    --- End diff --
    
    Yes, good catch


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160448342
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query at user-defined intervals.")
    --- End diff --
    
    Consider adding @EventDriven here, although if you do, you should update the capability description to say it may also run the query upon receiving an input flow file.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166111534
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    +            .required(true)
    +            .addValidator(AGG_VALIDATOR)
    +            .build();
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(CHARSET);
    +        _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(QUERY_ATTRIBUTE);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_RESULTS);
    +        _relationships.add(REL_ORIGINAL);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    static String buildBatch(List batch) {
    +        ObjectMapper mapper = new ObjectMapper();
    +        String retVal;
    +        try {
    +            retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
    +        } catch (Exception e) {
    +            retVal = null;
    +        }
    +
    +        return retVal;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    --- End diff --
    
    Ok. I'll take a look at that, and when I've got a solution I'll push an update.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160452672
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -95,15 +100,36 @@
     
         public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
                 .name("Write Concern")
    +            .displayName("Write Concern")
                 .description("The write concern to use")
                 .required(true)
                 .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
                         WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .displayName("Batch Size")
    +            .description("The number of elements returned from the server in one batch")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
         static List<PropertyDescriptor> descriptors = new ArrayList<>();
    --- End diff --
    
    Do you think the user would want to have the aggregation query as an attribute in the outgoing results flow file? This came up with ExecuteSQL a little while ago, and I thought for that one (and perhaps this one) we could provide an optional property for a Query Attribute or something, and if it was populated we would store the executed query in the provided attribute name. Thoughts?


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mattyb149 @alopresto I think all of the changes are in now. Any chance I could get this merged?


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mattyb149 We should be good to go now. I ran it with GenerateFlowFile and got it to execute an aggregation query.


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mattyb149 Changes are in and it's building.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r164106059
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -95,15 +102,54 @@
     
         public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
                 .name("Write Concern")
    +            .displayName("Write Concern")
                 .description("The write concern to use")
                 .required(true)
                 .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
                         WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .displayName("Batch Size")
    +            .description("The number of elements returned from the server in one batch")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    --- End diff --
    
    I think 0 would be confusing, so leaving it blank is the best bet. I should put some text in there to explain that.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r169371909
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    +            .required(true)
    +            .addValidator(AGG_VALIDATOR)
    +            .build();
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(CHARSET);
    +        _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(QUERY_ATTRIBUTE);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_RESULTS);
    +        _relationships.add(REL_ORIGINAL);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    static String buildBatch(List batch) {
    +        ObjectMapper mapper = new ObjectMapper();
    +        String retVal;
    +        try {
    +            retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
    +        } catch (Exception e) {
    +            retVal = null;
    +        }
    +
    +        return retVal;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = null;
    +        if (context.hasIncomingConnection()) {
    +            flowFile = session.get();
    +
    +            if (flowFile == null) {
    --- End diff --
    
    You're not checking for context.hasNonLoopConnection() here, is that on purpose? If I have no upstream connections but I connect Failure back to the processor, then it won't run my aggregation. If this is on purpose, please mention it in the relationship description(s) and/or the main capability description. Otherwise, you can add hasNonLoopConnection() (see ExecuteSQL for an example with comments). I prefer the latter, but it is up to you.


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    Ok. Did the merge and both GetMongoTest and RunMongoAggregationTest passed (didn't forget to comment out the @ignore statements...)


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @alopresto Can you merge this?


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166088202
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    --- End diff --
    
    I'll look for an example of that then.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166080619
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -221,4 +264,16 @@ protected WriteConcern getWriteConcern(final ProcessContext context) {
             }
             return writeConcern;
         }
    +
    +    protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, Map extraAttributes, Relationship rel) throws UnsupportedEncodingException {
    +        String charset = parent != null ? context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue()
    +                : context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
    +
    +        FlowFile flowFile = session.create(parent);
    --- End diff --
    
    The ternary operator for charset is fine, I'm referring to the session.create(parent). If parent is null, then an NPE will be thrown when it tries to get the parent's attributes.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160450126
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationTest.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.mongodb.MongoClient;
    +import com.mongodb.MongoClientURI;
    +import com.mongodb.client.MongoCollection;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.bson.Document;
    +import org.junit.After;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Calendar;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +@Ignore("This is an integration test that requires Mongo to be running.")
    +public class RunMongoAggregationTest {
    +
    +    private static final String MONGO_URI = "mongodb://localhost";
    +    private static final String DB_NAME   = String.format("agg_test-%s", Calendar.getInstance().getTimeInMillis());
    +    private static final String COLLECTION_NAME = "agg_test_data";
    +
    +    private TestRunner runner;
    +    private MongoClient mongoClient;
    +    private Map<String, Integer> mappings;
    +
    +    @Before
    +    public void setup() {
    +        runner = TestRunners.newTestRunner(RunMongoAggregation.class);
    +        runner.setVariable("uri", MONGO_URI);
    +        runner.setVariable("db", DB_NAME);
    +        runner.setVariable("collection", COLLECTION_NAME);
    +        runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
    +        runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
    +        runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
    +
    +        mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
    +
    +        MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
    +        String[] values = new String[] { "a", "b", "c" };
    +        mappings = new HashMap<>();
    +
    +        for (int x = 0; x < values.length; x++) {
    +            for (int y = 0; y < x + 2; y++) {
    +                Document doc = new Document().append("val", values[x]);
    +                collection.insertOne(doc);
    +            }
    +            mappings.put(values[x], x + 2);
    +        }
    +    }
    +
    +    @After
    +    public void teardown() {
    +        runner = null;
    +
    +        mongoClient.getDatabase(DB_NAME).drop();
    +    }
    +
    +    @Test
    +    public void testAggregation() throws Exception {
    +        runner.setProperty(RunMongoAggregation.QUERY, "[\n" +
    +                "    {\n" +
    +                "        \"$project\": {\n" +
    +                "            \"_id\": 0,\n" +
    +                "            \"val\": 1\n" +
    +                "        }\n" +
    +                "    },\n" +
    +                "    {\n" +
    +                "        \"$group\": {\n" +
    +                "            \"_id\": \"$val\",\n" +
    +                "            \"doc_count\": {\n" +
    +                "                \"$sum\": 1\n" +
    +                "            }\n" +
    +                "        }\n" +
    +                "    }\n" +
    +                "]");
    +        runner.enqueue("test");
    +        runner.run(1, true, true);
    +
    +        evaluateRunner();
    +    }
    +
    +    @Test
    +    public void testExpressionLanguageSupport() throws Exception {
    +        runner.setVariable("fieldName", "$val");
    +        runner.setProperty(RunMongoAggregation.QUERY, "[\n" +
    +                "    {\n" +
    +                "        \"$project\": {\n" +
    +                "            \"_id\": 0,\n" +
    +                "            \"val\": 1\n" +
    +                "        }\n" +
    +                "    },\n" +
    +                "    {\n" +
    +                "        \"$group\": {\n" +
    +                "            \"_id\": \"${fieldName}\",\n" +
    +                "            \"doc_count\": {\n" +
    +                "                \"$sum\": 1\n" +
    +                "            }\n" +
    +                "        }\n" +
    +                "    }\n" +
    +                "]");
    +
    +        runner.setValidateExpressionUsage(true);
    +        runner.enqueue("test");
    +        runner.run(1, true, true);
    +        evaluateRunner();
    +    }
    +
    +    private void evaluateRunner() throws IOException {
    --- End diff --
    
    Please add unit test(s) around various failure conditions where prudent (perhaps with a mock on the connection or whatever).


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r165995402
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    --- End diff --
    
    An example here would be quite helpful.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160446538
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -221,4 +247,12 @@ protected WriteConcern getWriteConcern(final ProcessContext context) {
             }
             return writeConcern;
         }
    +
    +    protected void writeBatch(String payload, ProcessContext context, ProcessSession session) throws UnsupportedEncodingException {
    +        FlowFile flowFile = session.create();
    --- End diff --
    
    We should pass in a reference to the parent flow file (or null if it doesn't exist). GetMongo does not allow input so there never is a parent, hence the session.create(). However in RunMongoAggregation if there is a parent flow file, it should be used in the session.create(parentFlowFile) to maintain correct provenance. I believe session.create(null) works fine so you don't need a check.
      


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166110849
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    --- End diff --
    
    Ok, borrowed an example from the Kafka processors.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r169402643
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    +            .required(true)
    +            .addValidator(AGG_VALIDATOR)
    +            .build();
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(CHARSET);
    +        _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(QUERY_ATTRIBUTE);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_RESULTS);
    +        _relationships.add(REL_ORIGINAL);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    static String buildBatch(List batch) {
    +        ObjectMapper mapper = new ObjectMapper();
    +        String retVal;
    +        try {
    +            retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
    +        } catch (Exception e) {
    +            retVal = null;
    +        }
    +
    +        return retVal;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = null;
    +        if (context.hasIncomingConnection()) {
    +            flowFile = session.get();
    +
    +            if (flowFile == null) {
    --- End diff --
    
    Oversight on my part. Made the change.


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @MikeThomsen not sure I'll get a chance before the New Year, but I will take a look when I get some time


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @mattyb149 This should be ready for merge now.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160803648
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -95,15 +100,36 @@
     
         public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
                 .name("Write Concern")
    +            .displayName("Write Concern")
                 .description("The write concern to use")
                 .required(true)
                 .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
                         WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .displayName("Batch Size")
    +            .description("The number of elements returned from the server in one batch")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
         static List<PropertyDescriptor> descriptors = new ArrayList<>();
    --- End diff --
    
    Yeah, easy enough.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160953566
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---
    @@ -268,32 +248,29 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                                     String payload = buildBatch(batch, jsonTypeSetting);
                                     writeBatch(payload, context, session);
                                     batch = new ArrayList<>();
    -                            } catch (IOException ex) {
    +                            } catch (Exception ex) {
                                     getLogger().error("Error building batch", ex);
                                 }
                             }
                         }
                         if (batch.size() > 0) {
                             try {
                                 writeBatch(buildBatch(batch, jsonTypeSetting), context, session);
    -                        } catch (IOException ex) {
    +                        } catch (Exception ex) {
                                 getLogger().error("Error sending remainder of batch", ex);
                             }
                         }
                     } else {
                         while (cursor.hasNext()) {
                             flowFile = session.create();
    -                        flowFile = session.write(flowFile, new OutputStreamCallback() {
    -                            @Override
    -                            public void process(OutputStream out) throws IOException {
    -                                String json;
    -                                if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
    -                                    json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next());
    -                                } else {
    -                                    json = cursor.next().toJson();
    -                                }
    -                                IOUtils.write(json, out);
    +                        flowFile = session.write(flowFile, out -> {
    +                            String json;
    +                            if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
    +                                json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next());
    +                            } else {
    +                                json = cursor.next().toJson();
                                 }
    +                            IOUtils.write(json, out);
    --- End diff --
    
    I didn't see a good way to do this with the Mongo API. Do you know of a way?


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r164200676
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java ---
    @@ -135,15 +135,15 @@ public void testValidators() {
             // invalid projection
             runner.setVariable("projection", "{a: x,y,z}");
             runner.setProperty(GetMongo.QUERY, "{a: 1}");
    -        runner.setProperty(GetMongo.PROJECTION, "${projection}");
    +        runner.setProperty(GetMongo.PROJECTION, "{a: z}");
    --- End diff --
    
    The test was reporting a false positive for me until I made that change. It seemed to think that x,y,z was a valid projection, but when I changed it to a:z it recognized it was invalid. Since none of that is valid JSON, I'm not even sure why any of it is working instead of the Mongo client API throwing an exception on parsing it.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166003772
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    +            .required(true)
    +            .addValidator(AGG_VALIDATOR)
    +            .build();
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(CHARSET);
    +        _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(QUERY_ATTRIBUTE);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_RESULTS);
    +        _relationships.add(REL_ORIGINAL);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    static String buildBatch(List batch) {
    +        ObjectMapper mapper = new ObjectMapper();
    +        String retVal;
    +        try {
    +            retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
    +        } catch (Exception e) {
    +            retVal = null;
    +        }
    +
    +        return retVal;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +
    +        String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
    +        String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
    +        Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
    +
    +        Map attrs = new HashMap();
    +        if (queryAttr != null && queryAttr.trim().length() > 0) {
    +            attrs.put(queryAttr, query);
    +        }
    +
    +        MongoCollection collection = getCollection(context);
    +        MongoCursor iter = null;
    +
    +        try {
    +            List<Bson> aggQuery = buildAggregationQuery(query);
    +            AggregateIterable it = collection.aggregate(aggQuery);
    +            it.batchSize(batchSize != null ? batchSize : 1);
    +
    +            iter = it.iterator();
    +            List batch = new ArrayList();
    +
    +            while (iter.hasNext()) {
    +                batch.add(iter.next());
    +                if (batch.size() == resultsPerFlowfile) {
    +                    writeBatch(buildBatch(batch), null, context, session, attrs, REL_RESULTS);
    --- End diff --
    
    Why hardcode the parent flowfile to null here? You can use a reference to flowFile and if it happens to be null, then it will work as it does here.


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by mbolka <gi...@git.apache.org>.
Github user mbolka commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    When is this branch going to be merged?


---

[GitHub] nifi issue #2180: Added GetMongoAggregation to support running Mongo aggrega...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on the issue:

    https://github.com/apache/nifi/pull/2180
  
    @alopresto @milanchandna Can you take a look at the changes when you get a chance?


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r143060642
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -102,8 +107,26 @@
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    --- End diff --
    
    Ok. Made that change and added the same change to a few other property descriptors while I was doing it.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r160447669
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query at user-defined intervals.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when it succeeds.")
    +            .name("Original")
    --- End diff --
    
    For consistency, the name should be lowercase. Also the description is a little confusing with respect to the success relationship (see my other comments), perhaps something like "Input flowfile(s) are sent to this relationship when the aggregation succeeds."?
      


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r165995911
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---
    @@ -152,21 +146,25 @@ public ValidationResult validate(final String subject, final String value, final
                 .displayName("JSON Type")
                 .name("json-type")
                 .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" +
    -            " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " +
    -            " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.")
    +                    " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " +
    +                    " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.")
                 .expressionLanguageSupported(false)
                 .required(true)
                 .build();
     
         private final static Set<Relationship> relationships;
         private final static List<PropertyDescriptor> propertyDescriptors;
     
    +    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
    +
         static {
             List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
             _propertyDescriptors.addAll(descriptors);
             _propertyDescriptors.add(JSON_TYPE);
             _propertyDescriptors.add(USE_PRETTY_PRINTING);
    +        _propertyDescriptors.add(CHARSET);
             _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(QUERY_ATTRIBUTE);
    --- End diff --
    
    Since you are adding an (albeit optional) property to GetMongo, there should be a unit test to cover it.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166111239
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---
    @@ -95,13 +101,50 @@
     
         public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
                 .name("Write Concern")
    +            .displayName("Write Concern")
                 .description("The write concern to use")
                 .required(true)
                 .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
                         WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
                 .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
                 .build();
     
    +    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
    +            .name("results-per-flowfile")
    +            .displayName("Results Per FlowFile")
    +            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1")
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .displayName("Batch Size")
    +            .description("The number of elements returned from the server in one batch.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query-attribute")
    +            .displayName("Query Output Attribute")
    +            .description("If set, the query will be written to a specified attribute on the output flowfiles.")
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
    +            .required(false)
    +            .build();
    +    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
    +            .name("el5-charset")
    --- End diff --
    
    Done.


---

[GitHub] nifi pull request #2180: Added GetMongoAggregation to support running Mongo ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2180#discussion_r166006323
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---
    @@ -0,0 +1,203 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.nifi.processors.mongodb;
    +
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.client.AggregateIterable;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.MongoCursor;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.bson.conversions.Bson;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +@Tags({"mongo", "aggregation", "aggregate"})
    +@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@EventDriven
    +public class RunMongoAggregation extends AbstractMongoProcessor {
    +
    +    private final static Set<Relationship> relationships;
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +
    +    static final Relationship REL_ORIGINAL = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query succeeds.")
    +            .name("original")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .description("The input flowfile gets sent to this relationship when the query fails.")
    +            .name("failure")
    +            .build();
    +    static final Relationship REL_RESULTS = new Relationship.Builder()
    +            .description("The result set of the aggregation will be sent to this relationship.")
    +            .name("results")
    +            .build();
    +
    +    static final List<Bson> buildAggregationQuery(String query) throws IOException {
    +        List<Bson> result = new ArrayList<>();
    +
    +        ObjectMapper mapper = new ObjectMapper();
    +        List<Map> values = mapper.readValue(query, List.class);
    +        for (Map val : values) {
    +            result.add(new BasicDBObject(val));
    +        }
    +
    +        return result;
    +    }
    +
    +    public static final Validator AGG_VALIDATOR = (subject, value, context) -> {
    +        final ValidationResult.Builder builder = new ValidationResult.Builder();
    +        builder.subject(subject).input(value);
    +
    +        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
    +            return builder.valid(true).explanation("Contains Expression Language").build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            buildAggregationQuery(value);
    +        } catch (final RuntimeException | IOException e) {
    +            reason = e.getLocalizedMessage();
    +        }
    +
    +        return builder.explanation(reason).valid(reason == null).build();
    +    };
    +
    +    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
    +            .name("mongo-agg-query")
    +            .displayName("Query")
    +            .expressionLanguageSupported(true)
    +            .description("The aggregation query to be executed.")
    +            .required(true)
    +            .addValidator(AGG_VALIDATOR)
    +            .build();
    +
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.addAll(descriptors);
    +        _propertyDescriptors.add(CHARSET);
    +        _propertyDescriptors.add(QUERY);
    +        _propertyDescriptors.add(QUERY_ATTRIBUTE);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
    +        _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
    +        _propertyDescriptors.add(CLIENT_AUTH);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        final Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_RESULTS);
    +        _relationships.add(REL_ORIGINAL);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    static String buildBatch(List batch) {
    +        ObjectMapper mapper = new ObjectMapper();
    +        String retVal;
    +        try {
    +            retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0));
    +        } catch (Exception e) {
    +            retVal = null;
    +        }
    +
    +        return retVal;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    --- End diff --
    
    It is possible here that there is a flow file in the queue but if there are multiple concurrent tasks, then one of the tasks may not get a flow file. However then some of the concurrent tasks will be behaving in a more event-driven fashion (based on an incoming flow file) and some of the tasks will be behaving in a scheduled fashion. This could get quite confusing in terms of expected behavior and can complicate debugging.
    
    If you are looking to have the processor behave in a more event-driven fashion when flow files are present, then you can check to see if there are any non-loop connections and react accordingly. For example, if there are non-loop connections into this processor, then you can require that session.get() return a flow file, and if there are no incoming connections, you don't have to call session.get() at all.  Check GenerateTableFetch for an example of how the two behaviors are implemented.


---