You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/08/31 19:34:22 UTC
nifi git commit: NIFI-5544: GetMongo refactored NIFI-5544: PR Review
changes
Repository: nifi
Updated Branches:
refs/heads/master 97e0f6a6a -> 05e32cff2
NIFI-5544: GetMongo refactored
NIFI-5544: PR Review changes
This closes #2958
Signed-off-by: Mike Thomsen <mi...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/05e32cff
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/05e32cff
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/05e32cff
Branch: refs/heads/master
Commit: 05e32cff28b44fcfc6504dc22fe6972d8f79b7eb
Parents: 97e0f6a
Author: zenfenan <si...@gmail.com>
Authored: Mon Aug 20 16:36:25 2018 +0530
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Fri Aug 31 15:32:44 2018 -0400
----------------------------------------------------------------------
.../nifi/processors/mongodb/GetMongo.java | 214 +++++++++----------
.../nifi/processors/mongodb/GetMongoIT.java | 35 +--
2 files changed, 129 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/05e32cff/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index cf9e3d7..507c9ce 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -67,22 +67,26 @@ public class GetMongo extends AbstractMongoProcessor {
static final String DB_NAME = "mongo.database.name";
static final String COL_NAME = "mongo.collection.name";
- static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles that have the results of a successful query execution go here.")
+ .build();
+
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
- .description("All input flowfiles that are part of a failed query execution go here.")
+ .description("All input FlowFiles that are part of a failed query execution go here.")
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
- .description("All input flowfiles that are part of a successful query execution go here.")
+ .description("All input FlowFiles that are part of a successful query execution go here.")
.build();
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("Query")
.description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" +
" an incoming connection from another processor to provide the query as a valid JSON document inside of " +
- "the flowfile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
+ "the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
"that will result in a full collection fetch using a \"{}\" query.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -96,6 +100,7 @@ public class GetMongo extends AbstractMongoProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE)
.build();
+
static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
.name("Sort")
.description("The fields by which to sort; must be a valid BSON document")
@@ -103,6 +108,7 @@ public class GetMongo extends AbstractMongoProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE)
.build();
+
static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
.name("Limit")
.description("The maximum number of elements to return")
@@ -113,7 +119,7 @@ public class GetMongo extends AbstractMongoProcessor {
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
- .description("The number of elements returned from the server in one batch")
+ .description("The number of elements to be returned from the server in one batch")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@@ -121,7 +127,7 @@ public class GetMongo extends AbstractMongoProcessor {
static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("results-per-flowfile")
.displayName("Results Per FlowFile")
- .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.")
+ .description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@@ -129,11 +135,12 @@ public class GetMongo extends AbstractMongoProcessor {
static final AllowableValue YES_PP = new AllowableValue("true", "True");
static final AllowableValue NO_PP = new AllowableValue("false", "False");
+
static final PropertyDescriptor USE_PRETTY_PRINTING = new PropertyDescriptor.Builder()
.name("use-pretty-printing")
.displayName("Pretty Print Results JSON")
.description("Choose whether or not to pretty print the JSON from the results of the query. " +
- "Choosing yes can greatly increase the space requirements on disk depending on the complexity of the JSON document")
+ "Choosing 'True' can greatly increase the space requirements on disk depending on the complexity of the JSON document")
.required(true)
.defaultValue(YES_PP.getValue())
.allowableValues(YES_PP, NO_PP)
@@ -142,6 +149,7 @@ public class GetMongo extends AbstractMongoProcessor {
private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
+ private ComponentLog logger;
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
@@ -204,144 +212,134 @@ public class GetMongo extends AbstractMongoProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile input = null;
+ logger = getLogger();
+
if (context.hasIncomingConnection()) {
input = session.get();
-
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
- final ComponentLog logger = getLogger();
+ final Document query = getQuery(context, session, input );
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+ if (query == null) {
+ return;
+ }
- final Document query;
+ final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
+ final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
+ final Map<String, String> attributes = new HashMap<>();
- String queryStr;
- if (context.getProperty(QUERY).isSet()) {
- queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
- query = Document.parse(queryStr);
- } else if (!context.getProperty(QUERY).isSet() && input == null) {
- queryStr = "{}";
- query = Document.parse("{}");
- } else {
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- session.exportTo(input, out);
- out.close();
- queryStr = new String(out.toByteArray());
- query = Document.parse(queryStr);
- } catch (Exception ex) {
- getLogger().error("Error reading flowfile", ex);
- if (input != null) { //Likely culprit is a bad query
- session.transfer(input, REL_FAILURE);
- return;
- } else {
- throw new ProcessException(ex);
- }
- }
- }
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
- attributes.put(queryAttr, queryStr);
+ attributes.put(queryAttr, query.toJson());
}
final Document projection = context.getProperty(PROJECTION).isSet()
? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null;
final Document sort = context.getProperty(SORT).isSet()
? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null;
- final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
- final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
- configureMapper(jsonTypeSetting);
+ final MongoCollection<Document> collection = getCollection(context, input);
+ final FindIterable<Document> it = collection.find(query);
- try {
- final MongoCollection<Document> collection = getCollection(context, input);
+ attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
+ attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
- attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
- attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
+ if (projection != null) {
+ it.projection(projection);
+ }
+ if (sort != null) {
+ it.sort(sort);
+ }
+ if (context.getProperty(LIMIT).isSet()) {
+ it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
+ }
+ if (context.getProperty(BATCH_SIZE).isSet()) {
+ it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
+ }
- final FindIterable<Document> it = query != null ? collection.find(query) : collection.find();
- if (projection != null) {
- it.projection(projection);
- }
- if (sort != null) {
- it.sort(sort);
- }
- if (context.getProperty(LIMIT).isSet()) {
- it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
- }
- if (context.getProperty(BATCH_SIZE).isSet()) {
- it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
- }
+ try (MongoCursor<Document> cursor = it.iterator()) {
+ configureMapper(jsonTypeSetting);
- final MongoCursor<Document> cursor = it.iterator();
- ComponentLog log = getLogger();
- try {
- FlowFile outgoingFlowFile;
- if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
- int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger();
- List<Document> batch = new ArrayList<>();
-
- while (cursor.hasNext()) {
- batch.add(cursor.next());
- if (batch.size() == ceiling) {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Writing batch...");
- }
- String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint);
- writeBatch(payload, input, context, session, attributes, REL_SUCCESS);
- batch = new ArrayList<>();
- } catch (Exception ex) {
- getLogger().error("Error building batch", ex);
- }
- }
- }
- if (batch.size() > 0) {
+ if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
+ int sizePerBatch = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger();
+ List<Document> batch = new ArrayList<>();
+
+ while (cursor.hasNext()) {
+ batch.add(cursor.next());
+
+ if (batch.size() == sizePerBatch) {
try {
writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS);
- } catch (Exception ex) {
- getLogger().error("Error sending remainder of batch", ex);
+ batch = new ArrayList<>();
+ } catch (Exception e) {
+ logger.error("Error building batch due to {}", new Object[] {e});
}
}
- } else {
- while (cursor.hasNext()) {
- outgoingFlowFile = (input == null) ? session.create() : session.create(input);
- outgoingFlowFile = session.write(outgoingFlowFile, out -> {
- String json;
- if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
- json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next());
- } else {
- json = cursor.next().toJson();
- }
- out.write(json.getBytes(charset));
- });
- outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
-
- session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context));
- session.transfer(outgoingFlowFile, REL_SUCCESS);
- }
}
- if (input != null) {
- session.transfer(input, REL_ORIGINAL);
+ if (batch.size() > 0) {
+ try {
+ writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS);
+ } catch (Exception e) {
+ logger.error("Error building batch due to {}", new Object[] {e});
+ }
}
+ } else {
+ FlowFile outgoingFlowFile;
+
+ while (cursor.hasNext()) {
+ outgoingFlowFile = (input == null) ? session.create() : session.create(input);
+ outgoingFlowFile = session.write(outgoingFlowFile, out -> {
+ if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
+ out.write(getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next()).getBytes(charset));
+ } else {
+ out.write(cursor.next().toJson().getBytes(charset));
+ }
+ });
- } finally {
- cursor.close();
+ outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
+ session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context));
+ session.transfer(outgoingFlowFile, REL_SUCCESS);
+ }
}
- } catch (final RuntimeException e) {
if (input != null) {
- session.transfer(input, REL_FAILURE);
+ session.transfer(input, REL_ORIGINAL);
+ }
+ }
+
+ }
+
+ private Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
+ Document query = null;
+ if (context.getProperty(QUERY).isSet()) {
+ query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue());
+ } else if (!context.getProperty(QUERY).isSet() && input == null) {
+ query = Document.parse("{}");
+ } else {
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ session.exportTo(input, out);
+ out.close();
+ query = Document.parse(new String(out.toByteArray()));
+ } catch (Exception ex) {
+ logger.error("Error reading FlowFile : ", ex);
+ if (input != null) { //Likely culprit is a bad query
+ session.transfer(input, REL_FAILURE);
+ session.commit();
+ } else {
+ throw new ProcessException(ex);
+ }
}
- context.yield();
- logger.error("Failed to execute query {} due to {}", new Object[] { query, e }, e);
}
+
+ return query;
}
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/05e32cff/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
index aaf6fa3..08bc8cf 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
@@ -194,8 +194,8 @@ public class GetMongoIT {
public void testReadMultipleDocuments() throws Exception {
runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}");
runner.run();
-
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
+
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
for (int i=0; i < flowFiles.size(); i++) {
flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
@@ -313,7 +313,7 @@ public class GetMongoIT {
runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr);
runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
- testQueryAttribute(attr, "{}");
+ testQueryAttribute(attr, "{ }");
runner.clearTransferState();
@@ -323,7 +323,7 @@ public class GetMongoIT {
runner.removeProperty(GetMongo.QUERY);
runner.setIncomingConnection(false);
runner.run();
- testQueryAttribute(attr, "{}");
+ testQueryAttribute(attr, "{ }");
runner.clearTransferState();
@@ -334,7 +334,7 @@ public class GetMongoIT {
runner.setIncomingConnection(true);
runner.enqueue("{}");
runner.run();
- testQueryAttribute(attr, "{}");
+ testQueryAttribute(attr, "{ }");
/*
* Input flowfile with invalid query
@@ -478,6 +478,7 @@ public class GetMongoIT {
*/
@Test
public void testDatabaseEL() {
+ runner.clearTransferState();
runner.removeVariable("collection");
runner.removeVariable("db");
runner.setIncomingConnection(true);
@@ -506,26 +507,36 @@ public class GetMongoIT {
}
Map<String, Map<String, String>> vals = new HashMap<String, Map<String, String>>(){{
- put("Database", new HashMap<String, String>(){{
- put("db", "");
- put("collection", "test");
- }});
put("Collection", new HashMap<String, String>(){{
put("db", "getmongotest");
put("collection", "");
}});
+ put("Database", new HashMap<String, String>(){{
+ put("db", "");
+ put("collection", "test");
+ }});
}};
+ TestRunner tmpRunner;
+
for (Map.Entry<String, Map<String, String>> entry : vals.entrySet()) {
- runner.enqueue("{}", entry.getValue());
+ // Creating a new runner for each set of attributes map since every subsequent runs will attempt to take the top most enqueued FlowFile
+ tmpRunner = TestRunners.newTestRunner(GetMongo.class);
+ tmpRunner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
+ tmpRunner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
+ tmpRunner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
+ tmpRunner.setIncomingConnection(true);
+
+ tmpRunner.enqueue("{ }", entry.getValue());
+
try {
- runner.run();
+ tmpRunner.run();
} catch (Throwable ex) {
Throwable cause = ex.getCause();
Assert.assertTrue(cause instanceof ProcessException);
- Assert.assertTrue(entry.getKey(), cause.getMessage().contains(entry.getKey()));
+ Assert.assertTrue(entry.getKey(), ex.getMessage().contains(entry.getKey()));
}
- runner.clearTransferState();
+ tmpRunner.clearTransferState();
}
}