You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by zenfenan <gi...@git.apache.org> on 2018/08/21 17:53:47 UTC

[GitHub] nifi pull request #2958: NIFI-5544: GetMongo refactored

GitHub user zenfenan opened a pull request:

    https://github.com/apache/nifi/pull/2958

    NIFI-5544: GetMongo refactored

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [x] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [x] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zenfenan/nifi NIFI-5544

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2958.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2958
    
----
commit 9477bfd7b65a943b4faced2259131c59c0051a4b
Author: zenfenan <si...@...>
Date:   2018-08-20T11:06:25Z

    NIFI-5544: GetMongo refactored

----


---

[GitHub] nifi pull request #2958: NIFI-5544: GetMongo refactored

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2958#discussion_r214022342
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---
    @@ -204,144 +212,145 @@ private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) {
         @Override
         public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
             FlowFile input = null;
    +        logger = getLogger();
    +
             if (context.hasIncomingConnection()) {
                 input = session.get();
    -
                 if (input == null && context.hasNonLoopConnection()) {
                     return;
                 }
             }
     
    -        final ComponentLog logger = getLogger();
    +        final Document query = getQuery(context, session, input );
     
    -        Map<String, String> attributes = new HashMap<>();
    -        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
    +        if (query == null) {
    +            return;
    +        }
     
    -        final Document query;
    +        final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
    +        final String usePrettyPrint  = context.getProperty(USE_PRETTY_PRINTING).getValue();
             final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
    +        final Map<String, String> attributes = new HashMap<>();
     
    -        String queryStr;
    -        if (context.getProperty(QUERY).isSet()) {
    -            queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
    -            query = Document.parse(queryStr);
    -        } else if (!context.getProperty(QUERY).isSet() && input == null) {
    -            queryStr = "{}";
    -            query = Document.parse("{}");
    -        } else {
    -            try {
    -                ByteArrayOutputStream out = new ByteArrayOutputStream();
    -                session.exportTo(input, out);
    -                out.close();
    -                queryStr = new String(out.toByteArray());
    -                query = Document.parse(queryStr);
    -            } catch (Exception ex) {
    -                getLogger().error("Error reading flowfile", ex);
    -                if (input != null) { //Likely culprit is a bad query
    -                    session.transfer(input, REL_FAILURE);
    -                    return;
    -                } else {
    -                    throw new ProcessException(ex);
    -                }
    -            }
    -        }
    +        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
     
             if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
                 final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
    -            attributes.put(queryAttr, queryStr);
    +            attributes.put(queryAttr, query.toJson());
             }
     
             final Document projection = context.getProperty(PROJECTION).isSet()
                     ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null;
             final Document sort = context.getProperty(SORT).isSet()
                     ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null;
    -        final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
    -        final String usePrettyPrint  = context.getProperty(USE_PRETTY_PRINTING).getValue();
    -        configureMapper(jsonTypeSetting);
     
    +        final MongoCollection<Document> collection = getCollection(context, input);
    +        final FindIterable<Document> it = collection.find(query);
     
    -        try {
    -            final MongoCollection<Document> collection = getCollection(context, input);
    +        attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
    +        attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
     
    -            attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
    -            attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
    +        if (projection != null) {
    +            it.projection(projection);
    +        }
    +        if (sort != null) {
    +            it.sort(sort);
    +        }
    +        if (context.getProperty(LIMIT).isSet()) {
    +            it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
    +        }
    +        if (context.getProperty(BATCH_SIZE).isSet()) {
    +            it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
    +        }
     
    -            final FindIterable<Document> it = query != null ? collection.find(query) : collection.find();
    -            if (projection != null) {
    -                it.projection(projection);
    -            }
    -            if (sort != null) {
    -                it.sort(sort);
    -            }
    -            if (context.getProperty(LIMIT).isSet()) {
    -                it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
    -            }
    -            if (context.getProperty(BATCH_SIZE).isSet()) {
    -                it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
    +        try (MongoCursor<Document> cursor = it.iterator()) {
    +            final List<Document> listOfDocuments = new ArrayList<>();
    +            Document doc;
    +
    +            configureMapper(jsonTypeSetting);
    +
    +            while ((doc = cursor.tryNext()) != null) {
    --- End diff --
    
    Fair point. I'll work on that.


---

[GitHub] nifi pull request #2958: NIFI-5544: GetMongo refactored

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2958#discussion_r213979035
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---
    @@ -204,144 +212,145 @@ private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) {
         @Override
         public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
             FlowFile input = null;
    +        logger = getLogger();
    +
             if (context.hasIncomingConnection()) {
                 input = session.get();
    -
                 if (input == null && context.hasNonLoopConnection()) {
                     return;
                 }
             }
     
    -        final ComponentLog logger = getLogger();
    +        final Document query = getQuery(context, session, input );
     
    -        Map<String, String> attributes = new HashMap<>();
    -        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
    +        if (query == null) {
    +            return;
    +        }
     
    -        final Document query;
    +        final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
    +        final String usePrettyPrint  = context.getProperty(USE_PRETTY_PRINTING).getValue();
             final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
    +        final Map<String, String> attributes = new HashMap<>();
     
    -        String queryStr;
    -        if (context.getProperty(QUERY).isSet()) {
    -            queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
    -            query = Document.parse(queryStr);
    -        } else if (!context.getProperty(QUERY).isSet() && input == null) {
    -            queryStr = "{}";
    -            query = Document.parse("{}");
    -        } else {
    -            try {
    -                ByteArrayOutputStream out = new ByteArrayOutputStream();
    -                session.exportTo(input, out);
    -                out.close();
    -                queryStr = new String(out.toByteArray());
    -                query = Document.parse(queryStr);
    -            } catch (Exception ex) {
    -                getLogger().error("Error reading flowfile", ex);
    -                if (input != null) { //Likely culprit is a bad query
    -                    session.transfer(input, REL_FAILURE);
    -                    return;
    -                } else {
    -                    throw new ProcessException(ex);
    -                }
    -            }
    -        }
    +        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
     
             if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
                 final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
    -            attributes.put(queryAttr, queryStr);
    +            attributes.put(queryAttr, query.toJson());
             }
     
             final Document projection = context.getProperty(PROJECTION).isSet()
                     ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null;
             final Document sort = context.getProperty(SORT).isSet()
                     ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null;
    -        final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
    -        final String usePrettyPrint  = context.getProperty(USE_PRETTY_PRINTING).getValue();
    -        configureMapper(jsonTypeSetting);
     
    +        final MongoCollection<Document> collection = getCollection(context, input);
    +        final FindIterable<Document> it = collection.find(query);
     
    -        try {
    -            final MongoCollection<Document> collection = getCollection(context, input);
    +        attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
    +        attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
     
    -            attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
    -            attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
    +        if (projection != null) {
    +            it.projection(projection);
    +        }
    +        if (sort != null) {
    +            it.sort(sort);
    +        }
    +        if (context.getProperty(LIMIT).isSet()) {
    +            it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
    +        }
    +        if (context.getProperty(BATCH_SIZE).isSet()) {
    +            it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
    +        }
     
    -            final FindIterable<Document> it = query != null ? collection.find(query) : collection.find();
    -            if (projection != null) {
    -                it.projection(projection);
    -            }
    -            if (sort != null) {
    -                it.sort(sort);
    -            }
    -            if (context.getProperty(LIMIT).isSet()) {
    -                it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
    -            }
    -            if (context.getProperty(BATCH_SIZE).isSet()) {
    -                it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
    +        try (MongoCursor<Document> cursor = it.iterator()) {
    +            final List<Document> listOfDocuments = new ArrayList<>();
    +            Document doc;
    +
    +            configureMapper(jsonTypeSetting);
    +
    +            while ((doc = cursor.tryNext()) != null) {
    --- End diff --
    
    We can't load the result set into a list like this because it's easy for a Mongo result set to overflow the heap.


---

[GitHub] nifi pull request #2958: NIFI-5544: GetMongo refactored

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2958#discussion_r213977393
  
    --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---
    @@ -204,144 +212,145 @@ private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) {
         @Override
         public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
             FlowFile input = null;
    +        logger = getLogger();
    +
             if (context.hasIncomingConnection()) {
                 input = session.get();
    -
                 if (input == null && context.hasNonLoopConnection()) {
                     return;
                 }
             }
     
    -        final ComponentLog logger = getLogger();
    +        final Document query = getQuery(context, session, input );
    --- End diff --
    
    I think I actually refactored this is another one of my Mongo commits... :-D


---

[GitHub] nifi pull request #2958: NIFI-5544: GetMongo refactored

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2958


---