You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/07/03 11:52:23 UTC
nifi git commit: NIFI-5334: GetMongo keeps original attributes when
it has incoming connections
Repository: nifi
Updated Branches:
refs/heads/master df4931280 -> 3164759d7
NIFI-5334: GetMongo keeps original attributes when it has incoming connections
This closes #2815
Signed-off-by: Mike Thomsen <mi...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3164759d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3164759d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3164759d
Branch: refs/heads/master
Commit: 3164759d7ffb8ba5be360796fd4bfc004f2c79c0
Parents: df49312
Author: zenfenan <si...@gmail.com>
Authored: Tue Jun 26 20:01:59 2018 +0530
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Tue Jul 3 07:51:20 2018 -0400
----------------------------------------------------------------------
.../nifi/processors/mongodb/GetMongo.java | 18 +++++++++-------
.../nifi/processors/mongodb/GetMongoIT.java | 22 +++++++++++++++++++-
2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3164759d/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index e744577..c5201fb 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -45,6 +45,7 @@ import org.bson.json.JsonWriterSettings;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -209,6 +210,8 @@ public class GetMongo extends AbstractMongoProcessor {
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
final Document query;
+ final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
+
String queryStr;
if (context.getProperty(QUERY).isSet()) {
queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
@@ -268,7 +271,7 @@ public class GetMongo extends AbstractMongoProcessor {
final MongoCursor<Document> cursor = it.iterator();
ComponentLog log = getLogger();
try {
- FlowFile flowFile = null;
+ FlowFile outgoingFlowFile;
if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger();
List<Document> batch = new ArrayList<>();
@@ -297,21 +300,20 @@ public class GetMongo extends AbstractMongoProcessor {
}
} else {
while (cursor.hasNext()) {
- final FlowFile ffPtr = input;
- flowFile = session.create();
- flowFile = session.write(flowFile, out -> {
+ outgoingFlowFile = (input == null) ? session.create() : session.create(input);
+ outgoingFlowFile = session.write(outgoingFlowFile, out -> {
String json;
if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next());
} else {
json = cursor.next().toJson();
}
- out.write(json.getBytes(context.getProperty(CHARSET).evaluateAttributeExpressions(ffPtr).getValue()));
+ out.write(json.getBytes(charset));
});
- flowFile = session.putAllAttributes(flowFile, attributes);
+ outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
- session.getProvenanceReporter().receive(flowFile, getURI(context));
- session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context));
+ session.transfer(outgoingFlowFile, REL_SUCCESS);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3164759d/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
index 2c9b6d2..1823edf 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
@@ -406,7 +406,6 @@ public class GetMongoIT {
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
-
}
@Test
@@ -445,6 +444,27 @@ public class GetMongoIT {
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
}
+
+ @Test
+ public void testKeepOriginalAttributes() {
+ final String query = "{ \"c\": { \"$gte\": 4 }}";
+ final Map<String, String> attributesMap = new HashMap<>(1);
+ attributesMap.put("property.1", "value-1");
+
+ runner.setIncomingConnection(true);
+ runner.removeProperty(GetMongo.QUERY);
+ runner.enqueue(query, attributesMap);
+
+ runner.run(1, true, true);
+
+ runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
+ runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
+ runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS).get(0);
+ Assert.assertTrue(flowFile.getAttributes().containsKey("property.1"));
+ flowFile.assertAttributeEquals("property.1", "value-1");
+ }
/*
* End query read behavior tests
*/