You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/06/11 02:20:19 UTC
nifi git commit: NIFI-5288 Quietly convert Java arrays to Lists so
the MongoDB API can handle them.
Repository: nifi
Updated Branches:
refs/heads/master d02cd4f90 -> 5a39d2a81
NIFI-5288 Quietly convert Java arrays to Lists so the MongoDB API can handle them.
NIFI-5288 Made changes based on code review.
NIFI-5288 Theoretically supports nested arrays.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2778
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a39d2a8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a39d2a8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a39d2a8
Branch: refs/heads/master
Commit: 5a39d2a819ab18ee119b4eb63813f2c5aae2a50d
Parents: d02cd4f
Author: Mike Thomsen <mi...@gmail.com>
Authored: Sat Jun 9 15:38:02 2018 -0400
Committer: Matthew Burgess <ma...@apache.org>
Committed: Sun Jun 10 22:10:12 2018 -0400
----------------------------------------------------------------------
.../nifi-mongodb-processors/pom.xml | 17 ++++++++++
.../nifi/processors/mongodb/PutMongoRecord.java | 32 ++++++++++++++++++-
.../processors/mongodb/PutMongoRecordIT.java | 33 ++++++++++++++++++++
3 files changed, 81 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/5a39d2a8/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
index 32420be..4d02075 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
@@ -91,5 +91,22 @@
<version>1.7.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/5a39d2a8/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
index 63ebda9..37706bc 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
@@ -131,7 +131,7 @@ public class PutMongoRecord extends AbstractMongoProcessor {
for (String name : schema.getFieldNames()) {
document.put(name, contentMap.get(name));
}
- inserts.add(document);
+ inserts.add(convertArrays(document));
if (inserts.size() == ceiling) {
collection.insertMany(inserts);
added += inserts.size();
@@ -154,4 +154,34 @@ public class PutMongoRecord extends AbstractMongoProcessor {
}
session.commit();
}
+
+ private Document convertArrays(Document doc) {
+ Document retVal = new Document();
+ for (Map.Entry<String, Object> entry : doc.entrySet()) {
+ if (entry.getValue() != null && entry.getValue().getClass().isArray()) {
+ retVal.put(entry.getKey(), convertArrays((Object[])entry.getValue()));
+ } else if (entry.getValue() != null && (entry.getValue() instanceof Map || entry.getValue() instanceof Document)) {
+ retVal.put(entry.getKey(), convertArrays(new Document((Map)entry.getValue())));
+ } else {
+ retVal.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ return retVal;
+ }
+
+ private List convertArrays(Object[] input) {
+ List retVal = new ArrayList();
+ for (Object o : input) {
+ if (o != null && o.getClass().isArray()) {
+ retVal.add(convertArrays((Object[])o));
+ } else if (o instanceof Map) {
+ retVal.add(convertArrays(new Document((Map)o)));
+ } else {
+ retVal.add(o);
+ }
+ }
+
+ return retVal;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5a39d2a8/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
index 5332695..2a24b32 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
@@ -17,12 +17,17 @@
package org.apache.nifi.processors.mongodb;
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
@@ -43,6 +48,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -191,4 +197,31 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
assertEquals(4, collection.count());
//assertEquals(doc, collection.find().first());
}
+
+ @Test
+ public void testArrayConversion() throws Exception {
+ TestRunner runner = init(PutMongoRecord.class);
+ MockSchemaRegistry registry = new MockSchemaRegistry();
+ String rawSchema = "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}," +
+ "{\"name\":\"arrayTest\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}";
+ RecordSchema schema = AvroTypeUtil.createSchema(new Schema.Parser().parse(rawSchema));
+ registry.addSchema("test", schema);
+ JsonTreeReader reader = new JsonTreeReader();
+ runner.addControllerService("registry", registry);
+ runner.addControllerService("reader", reader);
+ runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
+ runner.setProperty(PutMongoRecord.RECORD_READER_FACTORY, "reader");
+ runner.enableControllerService(registry);
+ runner.enableControllerService(reader);
+ runner.assertValid();
+
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put("schema.name", "test");
+
+ runner.enqueue("{\"name\":\"John Smith\",\"arrayTest\":[\"a\",\"b\",\"c\"]}", attrs);
+ runner.run();
+
+ runner.assertTransferCount(PutMongoRecord.REL_FAILURE, 0);
+ runner.assertTransferCount(PutMongoRecord.REL_SUCCESS, 1);
+ }
}