You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/06/01 15:28:15 UTC
[nifi] branch master updated: NIFI-5916 Added an option to enable
empty flowfiles to be sent if there are no results from a query.
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 407add7 NIFI-5916 Added an option to enable empty flowfiles to be sent if there are no results from a query.
407add7 is described below
commit 407add7847f9d79ea9c7b604fd57c3160ff30c06
Author: Mike Thomsen <mi...@gmail.com>
AuthorDate: Sat Feb 16 07:27:25 2019 -0500
NIFI-5916 Added an option to enable empty flowfiles to be sent if there are no results from a query.
NIFI-5916 Fixed potential NPE.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3315
---
.../apache/nifi/processors/mongodb/GetMongo.java | 30 +++++++++++++++++++++-
.../apache/nifi/processors/mongodb/GetMongoIT.java | 21 +++++++++++++--
2 files changed, 48 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index 286c47e..b3b82b0 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -29,15 +29,18 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import org.bson.json.JsonWriterSettings;
@@ -58,7 +61,16 @@ import java.util.Set;
@WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.")
})
public class GetMongo extends AbstractMongoQueryProcessor {
-
+ public static final PropertyDescriptor SEND_EMPTY_RESULTS = new PropertyDescriptor.Builder()
+ .name("get-mongo-send-empty")
+ .displayName("Send Empty Result")
+ .description("If a query executes successfully, but returns no results, send an empty JSON document " +
+ "signifying no result.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .required(false)
+ .build();
static final AllowableValue YES_PP = new AllowableValue("true", "True");
static final AllowableValue NO_PP = new AllowableValue("false", "False");
@@ -94,6 +106,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
_propertyDescriptors.add(DATE_FORMAT);
_propertyDescriptors.add(SSL_CONTEXT_SERVICE);
_propertyDescriptors.add(CLIENT_AUTH);
+ _propertyDescriptors.add(SEND_EMPTY_RESULTS);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
final Set<Relationship> _relationships = new HashSet<>();
@@ -103,6 +116,12 @@ public class GetMongo extends AbstractMongoQueryProcessor {
relationships = Collections.unmodifiableSet(_relationships);
}
+ private boolean sendEmpty;
+ @OnScheduled
+ public void onScheduled(PropertyContext context) {
+ sendEmpty = context.getProperty(SEND_EMPTY_RESULTS).asBoolean();
+ }
+
@Override
public Set<Relationship> getRelationships() {
return relationships;
@@ -191,6 +210,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
}
+ long sent = 0;
try (MongoCursor<Document> cursor = it.iterator()) {
configureMapper(jsonTypeSetting, dateFormat);
@@ -209,6 +229,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
logger.error("Error building batch due to {}", new Object[] {e});
}
}
+ sent++;
}
if (batch.size() > 0) {
@@ -234,12 +255,19 @@ public class GetMongo extends AbstractMongoQueryProcessor {
outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context));
session.transfer(outgoingFlowFile, REL_SUCCESS);
+ sent++;
}
}
if (input != null) {
session.transfer(input, REL_ORIGINAL);
}
+
+ if (sent == 0 && sendEmpty) {
+ FlowFile empty = input != null ? session.create(input) : session.create();
+ empty = session.putAllAttributes(empty, attributes);
+ session.transfer(empty, REL_SUCCESS);
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
index 286a70d..ddea9a8 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
@@ -620,7 +620,7 @@ public class GetMongoIT {
//Test a bad flowfile attribute
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.QUERY, "${badfromff}");
- runner.enqueue("<<?>>", new HashMap<String, String>(){{
+ runner.enqueue("<<?>>", new HashMap<String, String>() {{
put("badfromff", "{\"prop\":}");
}});
runner.run();
@@ -633,7 +633,7 @@ public class GetMongoIT {
//Test for regression on a good query from a flowfile attribute
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.QUERY, "${badfromff}");
- runner.enqueue("<<?>>", new HashMap<String, String>(){{
+ runner.enqueue("<<?>>", new HashMap<String, String>() {{
put("badfromff", "{}");
}});
runner.run();
@@ -651,4 +651,21 @@ public class GetMongoIT {
runner.assertTransferCount(GetMongo.REL_SUCCESS, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
}
+
+ public void testSendEmpty() throws Exception {
+ runner.setIncomingConnection(true);
+ runner.setProperty(GetMongo.SEND_EMPTY_RESULTS, "true");
+ runner.setProperty(GetMongo.QUERY, "{ \"nothing\": true }");
+ runner.assertValid();
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
+ runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
+ runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
+
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+ MockFlowFile flowFile = flowFiles.get(0);
+ Assert.assertEquals(0, flowFile.getSize());
+ }
}