You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by zenfenan <gi...@git.apache.org> on 2018/08/21 17:53:47 UTC
[GitHub] nifi pull request #2958: NIFI-5544: GetMongo refactored
GitHub user zenfenan opened a pull request:
https://github.com/apache/nifi/pull/2958
NIFI-5544: GetMongo refactored
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
- [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
- [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
- [x] Is your initial contribution a single, squashed commit?
### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
### Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zenfenan/nifi NIFI-5544
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/nifi/pull/2958.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2958
----
commit 9477bfd7b65a943b4faced2259131c59c0051a4b
Author: zenfenan <si...@...>
Date: 2018-08-20T11:06:25Z
NIFI-5544: GetMongo refactored
----
---
[GitHub] nifi pull request #2958: NIFI-5544: GetMongo refactored
Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2958#discussion_r214022342
--- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---
@@ -204,144 +212,145 @@ private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile input = null;
+ logger = getLogger();
+
if (context.hasIncomingConnection()) {
input = session.get();
-
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
- final ComponentLog logger = getLogger();
+ final Document query = getQuery(context, session, input );
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+ if (query == null) {
+ return;
+ }
- final Document query;
+ final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
+ final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
+ final Map<String, String> attributes = new HashMap<>();
- String queryStr;
- if (context.getProperty(QUERY).isSet()) {
- queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
- query = Document.parse(queryStr);
- } else if (!context.getProperty(QUERY).isSet() && input == null) {
- queryStr = "{}";
- query = Document.parse("{}");
- } else {
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- session.exportTo(input, out);
- out.close();
- queryStr = new String(out.toByteArray());
- query = Document.parse(queryStr);
- } catch (Exception ex) {
- getLogger().error("Error reading flowfile", ex);
- if (input != null) { //Likely culprit is a bad query
- session.transfer(input, REL_FAILURE);
- return;
- } else {
- throw new ProcessException(ex);
- }
- }
- }
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
- attributes.put(queryAttr, queryStr);
+ attributes.put(queryAttr, query.toJson());
}
final Document projection = context.getProperty(PROJECTION).isSet()
? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null;
final Document sort = context.getProperty(SORT).isSet()
? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null;
- final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
- final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
- configureMapper(jsonTypeSetting);
+ final MongoCollection<Document> collection = getCollection(context, input);
+ final FindIterable<Document> it = collection.find(query);
- try {
- final MongoCollection<Document> collection = getCollection(context, input);
+ attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
+ attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
- attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
- attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
+ if (projection != null) {
+ it.projection(projection);
+ }
+ if (sort != null) {
+ it.sort(sort);
+ }
+ if (context.getProperty(LIMIT).isSet()) {
+ it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
+ }
+ if (context.getProperty(BATCH_SIZE).isSet()) {
+ it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
+ }
- final FindIterable<Document> it = query != null ? collection.find(query) : collection.find();
- if (projection != null) {
- it.projection(projection);
- }
- if (sort != null) {
- it.sort(sort);
- }
- if (context.getProperty(LIMIT).isSet()) {
- it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
- }
- if (context.getProperty(BATCH_SIZE).isSet()) {
- it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
+ try (MongoCursor<Document> cursor = it.iterator()) {
+ final List<Document> listOfDocuments = new ArrayList<>();
+ Document doc;
+
+ configureMapper(jsonTypeSetting);
+
+ while ((doc = cursor.tryNext()) != null) {
--- End diff --
Fair point. I'll work on that.
---
[GitHub] nifi pull request #2958: NIFI-5544: GetMongo refactored
Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2958#discussion_r213979035
--- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---
@@ -204,144 +212,145 @@ private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile input = null;
+ logger = getLogger();
+
if (context.hasIncomingConnection()) {
input = session.get();
-
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
- final ComponentLog logger = getLogger();
+ final Document query = getQuery(context, session, input );
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+ if (query == null) {
+ return;
+ }
- final Document query;
+ final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
+ final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
+ final Map<String, String> attributes = new HashMap<>();
- String queryStr;
- if (context.getProperty(QUERY).isSet()) {
- queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
- query = Document.parse(queryStr);
- } else if (!context.getProperty(QUERY).isSet() && input == null) {
- queryStr = "{}";
- query = Document.parse("{}");
- } else {
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- session.exportTo(input, out);
- out.close();
- queryStr = new String(out.toByteArray());
- query = Document.parse(queryStr);
- } catch (Exception ex) {
- getLogger().error("Error reading flowfile", ex);
- if (input != null) { //Likely culprit is a bad query
- session.transfer(input, REL_FAILURE);
- return;
- } else {
- throw new ProcessException(ex);
- }
- }
- }
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
- attributes.put(queryAttr, queryStr);
+ attributes.put(queryAttr, query.toJson());
}
final Document projection = context.getProperty(PROJECTION).isSet()
? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null;
final Document sort = context.getProperty(SORT).isSet()
? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null;
- final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
- final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
- configureMapper(jsonTypeSetting);
+ final MongoCollection<Document> collection = getCollection(context, input);
+ final FindIterable<Document> it = collection.find(query);
- try {
- final MongoCollection<Document> collection = getCollection(context, input);
+ attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
+ attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
- attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
- attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
+ if (projection != null) {
+ it.projection(projection);
+ }
+ if (sort != null) {
+ it.sort(sort);
+ }
+ if (context.getProperty(LIMIT).isSet()) {
+ it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
+ }
+ if (context.getProperty(BATCH_SIZE).isSet()) {
+ it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
+ }
- final FindIterable<Document> it = query != null ? collection.find(query) : collection.find();
- if (projection != null) {
- it.projection(projection);
- }
- if (sort != null) {
- it.sort(sort);
- }
- if (context.getProperty(LIMIT).isSet()) {
- it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
- }
- if (context.getProperty(BATCH_SIZE).isSet()) {
- it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
+ try (MongoCursor<Document> cursor = it.iterator()) {
+ final List<Document> listOfDocuments = new ArrayList<>();
+ Document doc;
+
+ configureMapper(jsonTypeSetting);
+
+ while ((doc = cursor.tryNext()) != null) {
--- End diff --
We can't load the result set into a list like this because it's easy for a Mongo result set to overflow the heap.
---
[GitHub] nifi pull request #2958: NIFI-5544: GetMongo refactored
Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2958#discussion_r213977393
--- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---
@@ -204,144 +212,145 @@ private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile input = null;
+ logger = getLogger();
+
if (context.hasIncomingConnection()) {
input = session.get();
-
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
- final ComponentLog logger = getLogger();
+ final Document query = getQuery(context, session, input );
--- End diff --
I think I actually refactored this is another one of my Mongo commits... :-D
---
[GitHub] nifi pull request #2958: NIFI-5544: GetMongo refactored
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/nifi/pull/2958
---