You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/08/31 19:34:22 UTC

nifi git commit: NIFI-5544: GetMongo refactored NIFI-5544: PR Review changes

Repository: nifi
Updated Branches:
  refs/heads/master 97e0f6a6a -> 05e32cff2


NIFI-5544: GetMongo refactored
NIFI-5544: PR Review changes

This closes #2958

Signed-off-by: Mike Thomsen <mi...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/05e32cff
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/05e32cff
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/05e32cff

Branch: refs/heads/master
Commit: 05e32cff28b44fcfc6504dc22fe6972d8f79b7eb
Parents: 97e0f6a
Author: zenfenan <si...@gmail.com>
Authored: Mon Aug 20 16:36:25 2018 +0530
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Fri Aug 31 15:32:44 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/mongodb/GetMongo.java       | 214 +++++++++----------
 .../nifi/processors/mongodb/GetMongoIT.java     |  35 +--
 2 files changed, 129 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/05e32cff/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index cf9e3d7..507c9ce 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -67,22 +67,26 @@ public class GetMongo extends AbstractMongoProcessor {
     static final String DB_NAME = "mongo.database.name";
     static final String COL_NAME = "mongo.collection.name";
 
-    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles that have the results of a successful query execution go here.")
+            .build();
+
     static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
-            .description("All input flowfiles that are part of a failed query execution go here.")
+            .description("All input FlowFiles that are part of a failed query execution go here.")
             .build();
 
     static final Relationship REL_ORIGINAL = new Relationship.Builder()
             .name("original")
-            .description("All input flowfiles that are part of a successful query execution go here.")
+            .description("All input FlowFiles that are part of a successful query execution go here.")
             .build();
 
     static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
         .name("Query")
         .description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" +
                 " an incoming connection from another processor to provide the query as a valid JSON document inside of " +
-                "the flowfile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
+                "the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
                 "that will result in a full collection fetch using a \"{}\" query.")
         .required(false)
         .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -96,6 +100,7 @@ public class GetMongo extends AbstractMongoProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(JsonValidator.INSTANCE)
             .build();
+
     static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
             .name("Sort")
             .description("The fields by which to sort; must be a valid BSON document")
@@ -103,6 +108,7 @@ public class GetMongo extends AbstractMongoProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(JsonValidator.INSTANCE)
             .build();
+
     static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
             .name("Limit")
             .description("The maximum number of elements to return")
@@ -113,7 +119,7 @@ public class GetMongo extends AbstractMongoProcessor {
 
     static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
             .name("Batch Size")
-            .description("The number of elements returned from the server in one batch")
+            .description("The number of elements to be returned from the server in one batch")
             .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@@ -121,7 +127,7 @@ public class GetMongo extends AbstractMongoProcessor {
     static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
             .name("results-per-flowfile")
             .displayName("Results Per FlowFile")
-            .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
+            .description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.")
             .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@@ -129,11 +135,12 @@ public class GetMongo extends AbstractMongoProcessor {
 
     static final AllowableValue YES_PP = new AllowableValue("true", "True");
     static final AllowableValue NO_PP  = new AllowableValue("false", "False");
+
     static final PropertyDescriptor USE_PRETTY_PRINTING = new PropertyDescriptor.Builder()
             .name("use-pretty-printing")
             .displayName("Pretty Print Results JSON")
             .description("Choose whether or not to pretty print the JSON from the results of the query. " +
-                    "Choosing yes can greatly increase the space requirements on disk depending on the complexity of the JSON document")
+                    "Choosing 'True' can greatly increase the space requirements on disk depending on the complexity of the JSON document")
             .required(true)
             .defaultValue(YES_PP.getValue())
             .allowableValues(YES_PP, NO_PP)
@@ -142,6 +149,7 @@ public class GetMongo extends AbstractMongoProcessor {
 
     private final static Set<Relationship> relationships;
     private final static List<PropertyDescriptor> propertyDescriptors;
+    private ComponentLog logger;
 
     static {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
@@ -204,144 +212,134 @@ public class GetMongo extends AbstractMongoProcessor {
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         FlowFile input = null;
+        logger = getLogger();
+
         if (context.hasIncomingConnection()) {
             input = session.get();
-
             if (input == null && context.hasNonLoopConnection()) {
                 return;
             }
         }
 
-        final ComponentLog logger = getLogger();
+        final Document query = getQuery(context, session, input );
 
-        Map<String, String> attributes = new HashMap<>();
-        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+        if (query == null) {
+            return;
+        }
 
-        final Document query;
+        final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
+        final String usePrettyPrint  = context.getProperty(USE_PRETTY_PRINTING).getValue();
         final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
+        final Map<String, String> attributes = new HashMap<>();
 
-        String queryStr;
-        if (context.getProperty(QUERY).isSet()) {
-            queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
-            query = Document.parse(queryStr);
-        } else if (!context.getProperty(QUERY).isSet() && input == null) {
-            queryStr = "{}";
-            query = Document.parse("{}");
-        } else {
-            try {
-                ByteArrayOutputStream out = new ByteArrayOutputStream();
-                session.exportTo(input, out);
-                out.close();
-                queryStr = new String(out.toByteArray());
-                query = Document.parse(queryStr);
-            } catch (Exception ex) {
-                getLogger().error("Error reading flowfile", ex);
-                if (input != null) { //Likely culprit is a bad query
-                    session.transfer(input, REL_FAILURE);
-                    return;
-                } else {
-                    throw new ProcessException(ex);
-                }
-            }
-        }
+        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
 
         if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
             final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
-            attributes.put(queryAttr, queryStr);
+            attributes.put(queryAttr, query.toJson());
         }
 
         final Document projection = context.getProperty(PROJECTION).isSet()
                 ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null;
         final Document sort = context.getProperty(SORT).isSet()
                 ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null;
-        final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
-        final String usePrettyPrint  = context.getProperty(USE_PRETTY_PRINTING).getValue();
-        configureMapper(jsonTypeSetting);
 
+        final MongoCollection<Document> collection = getCollection(context, input);
+        final FindIterable<Document> it = collection.find(query);
 
-        try {
-            final MongoCollection<Document> collection = getCollection(context, input);
+        attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
+        attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
 
-            attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
-            attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
+        if (projection != null) {
+            it.projection(projection);
+        }
+        if (sort != null) {
+            it.sort(sort);
+        }
+        if (context.getProperty(LIMIT).isSet()) {
+            it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
+        }
+        if (context.getProperty(BATCH_SIZE).isSet()) {
+            it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
+        }
 
-            final FindIterable<Document> it = query != null ? collection.find(query) : collection.find();
-            if (projection != null) {
-                it.projection(projection);
-            }
-            if (sort != null) {
-                it.sort(sort);
-            }
-            if (context.getProperty(LIMIT).isSet()) {
-                it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
-            }
-            if (context.getProperty(BATCH_SIZE).isSet()) {
-                it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
-            }
+        try (MongoCursor<Document> cursor = it.iterator()) {
+            configureMapper(jsonTypeSetting);
 
-            final MongoCursor<Document> cursor = it.iterator();
-            ComponentLog log = getLogger();
-            try {
-                FlowFile outgoingFlowFile;
-                if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
-                    int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger();
-                    List<Document> batch = new ArrayList<>();
-
-                    while (cursor.hasNext()) {
-                        batch.add(cursor.next());
-                        if (batch.size() == ceiling) {
-                            try {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Writing batch...");
-                                }
-                                String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint);
-                                writeBatch(payload, input, context, session, attributes, REL_SUCCESS);
-                                batch = new ArrayList<>();
-                            } catch (Exception ex) {
-                                getLogger().error("Error building batch", ex);
-                            }
-                        }
-                    }
-                    if (batch.size() > 0) {
+            if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
+                int sizePerBatch = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger();
+                List<Document> batch = new ArrayList<>();
+
+                while (cursor.hasNext()) {
+                    batch.add(cursor.next());
+
+                    if (batch.size() == sizePerBatch) {
                         try {
                             writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS);
-                        } catch (Exception ex) {
-                            getLogger().error("Error sending remainder of batch", ex);
+                            batch = new ArrayList<>();
+                        } catch (Exception e) {
+                            logger.error("Error building batch due to {}", new Object[] {e});
                         }
                     }
-                } else {
-                    while (cursor.hasNext()) {
-                        outgoingFlowFile = (input == null) ? session.create() : session.create(input);
-                        outgoingFlowFile = session.write(outgoingFlowFile, out -> {
-                            String json;
-                            if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
-                                json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next());
-                            } else {
-                                json = cursor.next().toJson();
-                            }
-                            out.write(json.getBytes(charset));
-                        });
-                        outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
-
-                        session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context));
-                        session.transfer(outgoingFlowFile, REL_SUCCESS);
-                    }
                 }
 
-                if (input != null) {
-                    session.transfer(input, REL_ORIGINAL);
+                if (batch.size() > 0) {
+                    try {
+                        writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS);
+                    } catch (Exception e) {
+                        logger.error("Error building batch due to {}", new Object[] {e});
+                    }
                 }
+            } else {
+                FlowFile outgoingFlowFile;
+
+                while (cursor.hasNext()) {
+                    outgoingFlowFile = (input == null) ? session.create() : session.create(input);
+                    outgoingFlowFile = session.write(outgoingFlowFile, out -> {
+                        if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
+                            out.write(getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next()).getBytes(charset));
+                        } else {
+                            out.write(cursor.next().toJson().getBytes(charset));
+                        }
+                    });
 
-            } finally {
-                cursor.close();
+                    outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
+                    session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context));
+                    session.transfer(outgoingFlowFile, REL_SUCCESS);
+                }
             }
 
-        } catch (final RuntimeException e) {
             if (input != null) {
-                session.transfer(input, REL_FAILURE);
+                session.transfer(input, REL_ORIGINAL);
+            }
+        }
+
+    }
+
+    private Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+        Document query = null;
+        if (context.getProperty(QUERY).isSet()) {
+            query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue());
+        } else if (!context.getProperty(QUERY).isSet() && input == null) {
+            query = Document.parse("{}");
+        } else {
+            try {
+                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                session.exportTo(input, out);
+                out.close();
+                query = Document.parse(new String(out.toByteArray()));
+            } catch (Exception ex) {
+                logger.error("Error reading FlowFile : ", ex);
+                if (input != null) { //Likely culprit is a bad query
+                    session.transfer(input, REL_FAILURE);
+                    session.commit();
+                } else {
+                    throw new ProcessException(ex);
+                }
             }
-            context.yield();
-            logger.error("Failed to execute query {} due to {}", new Object[] { query, e }, e);
         }
+
+        return query;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/05e32cff/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
index aaf6fa3..08bc8cf 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
@@ -194,8 +194,8 @@ public class GetMongoIT {
     public void testReadMultipleDocuments() throws Exception {
         runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}");
         runner.run();
-
         runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
+
         List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
         for (int i=0; i < flowFiles.size(); i++) {
             flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
@@ -313,7 +313,7 @@ public class GetMongoIT {
         runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr);
         runner.run();
         runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
-        testQueryAttribute(attr, "{}");
+        testQueryAttribute(attr, "{ }");
 
         runner.clearTransferState();
 
@@ -323,7 +323,7 @@ public class GetMongoIT {
         runner.removeProperty(GetMongo.QUERY);
         runner.setIncomingConnection(false);
         runner.run();
-        testQueryAttribute(attr, "{}");
+        testQueryAttribute(attr, "{ }");
 
         runner.clearTransferState();
 
@@ -334,7 +334,7 @@ public class GetMongoIT {
         runner.setIncomingConnection(true);
         runner.enqueue("{}");
         runner.run();
-        testQueryAttribute(attr, "{}");
+        testQueryAttribute(attr, "{ }");
 
         /*
          * Input flowfile with invalid query
@@ -478,6 +478,7 @@ public class GetMongoIT {
      */
     @Test
     public void testDatabaseEL() {
+        runner.clearTransferState();
         runner.removeVariable("collection");
         runner.removeVariable("db");
         runner.setIncomingConnection(true);
@@ -506,26 +507,36 @@ public class GetMongoIT {
         }
 
         Map<String, Map<String, String>> vals = new HashMap<String, Map<String, String>>(){{
-            put("Database", new HashMap<String, String>(){{
-                put("db", "");
-                put("collection", "test");
-            }});
             put("Collection", new HashMap<String, String>(){{
                 put("db", "getmongotest");
                 put("collection", "");
             }});
+            put("Database", new HashMap<String, String>(){{
+                put("db", "");
+                put("collection", "test");
+            }});
         }};
 
+        TestRunner tmpRunner;
+
         for (Map.Entry<String, Map<String, String>> entry : vals.entrySet()) {
-            runner.enqueue("{}", entry.getValue());
+            // Creating a new runner for each set of attributes map since every subsequent runs will attempt to take the top most enqueued FlowFile
+            tmpRunner = TestRunners.newTestRunner(GetMongo.class);
+            tmpRunner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
+            tmpRunner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
+            tmpRunner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
+            tmpRunner.setIncomingConnection(true);
+
+            tmpRunner.enqueue("{ }", entry.getValue());
+
             try {
-                runner.run();
+                tmpRunner.run();
             } catch (Throwable ex) {
                 Throwable cause = ex.getCause();
                 Assert.assertTrue(cause instanceof ProcessException);
-                Assert.assertTrue(entry.getKey(), cause.getMessage().contains(entry.getKey()));
+                Assert.assertTrue(entry.getKey(), ex.getMessage().contains(entry.getKey()));
             }
-            runner.clearTransferState();
+            tmpRunner.clearTransferState();
 
         }
     }