You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/06/11 02:20:19 UTC

nifi git commit: NIFI-5288 Quietly convert Java arrays to Lists so the MongoDB API can handle them.

Repository: nifi
Updated Branches:
  refs/heads/master d02cd4f90 -> 5a39d2a81


NIFI-5288 Quietly convert Java arrays to Lists so the MongoDB API can handle them.

NIFI-5288 Made changes based on code review.

NIFI-5288 Theoretically supports nested arrays.

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2778


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a39d2a8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a39d2a8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a39d2a8

Branch: refs/heads/master
Commit: 5a39d2a819ab18ee119b4eb63813f2c5aae2a50d
Parents: d02cd4f
Author: Mike Thomsen <mi...@gmail.com>
Authored: Sat Jun 9 15:38:02 2018 -0400
Committer: Matthew Burgess <ma...@apache.org>
Committed: Sun Jun 10 22:10:12 2018 -0400

----------------------------------------------------------------------
 .../nifi-mongodb-processors/pom.xml             | 17 ++++++++++
 .../nifi/processors/mongodb/PutMongoRecord.java | 32 ++++++++++++++++++-
 .../processors/mongodb/PutMongoRecordIT.java    | 33 ++++++++++++++++++++
 3 files changed, 81 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5a39d2a8/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
index 32420be..4d02075 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
@@ -91,5 +91,22 @@
             <version>1.7.0-SNAPSHOT</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.7.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a39d2a8/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
index 63ebda9..37706bc 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
@@ -131,7 +131,7 @@ public class PutMongoRecord extends AbstractMongoProcessor {
                 for (String name : schema.getFieldNames()) {
                     document.put(name, contentMap.get(name));
                 }
-                inserts.add(document);
+                inserts.add(convertArrays(document));
                 if (inserts.size() == ceiling) {
                     collection.insertMany(inserts);
                     added += inserts.size();
@@ -154,4 +154,34 @@ public class PutMongoRecord extends AbstractMongoProcessor {
         }
         session.commit();
     }
+
+    private Document convertArrays(Document doc) {
+        Document retVal = new Document();
+        for (Map.Entry<String, Object> entry : doc.entrySet()) {
+            if (entry.getValue() != null && entry.getValue().getClass().isArray()) {
+                retVal.put(entry.getKey(), convertArrays((Object[])entry.getValue()));
+            } else if (entry.getValue() != null && (entry.getValue() instanceof Map || entry.getValue() instanceof Document)) {
+                retVal.put(entry.getKey(), convertArrays(new Document((Map)entry.getValue())));
+            } else {
+                retVal.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        return retVal;
+    }
+
+    private List convertArrays(Object[] input) {
+        List retVal = new ArrayList();
+        for (Object o : input) {
+            if (o != null && o.getClass().isArray()) {
+                retVal.add(convertArrays((Object[])o));
+            } else if (o instanceof Map) {
+                retVal.add(convertArrays(new Document((Map)o)));
+            } else {
+                retVal.add(o);
+            }
+        }
+
+        return retVal;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a39d2a8/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
index 5332695..2a24b32 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
@@ -17,12 +17,17 @@
 package org.apache.nifi.processors.mongodb;
 
 
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
@@ -43,6 +48,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -191,4 +197,31 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
         assertEquals(4, collection.count());
         //assertEquals(doc, collection.find().first());
     }
+
+    @Test
+    public void testArrayConversion() throws Exception {
+        TestRunner runner = init(PutMongoRecord.class);
+        MockSchemaRegistry registry = new MockSchemaRegistry();
+        String rawSchema = "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}," +
+                "{\"name\":\"arrayTest\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}";
+        RecordSchema schema = AvroTypeUtil.createSchema(new Schema.Parser().parse(rawSchema));
+        registry.addSchema("test", schema);
+        JsonTreeReader reader = new JsonTreeReader();
+        runner.addControllerService("registry", registry);
+        runner.addControllerService("reader", reader);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
+        runner.setProperty(PutMongoRecord.RECORD_READER_FACTORY, "reader");
+        runner.enableControllerService(registry);
+        runner.enableControllerService(reader);
+        runner.assertValid();
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put("schema.name", "test");
+
+        runner.enqueue("{\"name\":\"John Smith\",\"arrayTest\":[\"a\",\"b\",\"c\"]}", attrs);
+        runner.run();
+
+        runner.assertTransferCount(PutMongoRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(PutMongoRecord.REL_SUCCESS, 1);
+    }
 }