You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by je...@apache.org on 2018/01/31 16:35:46 UTC

nifi git commit: NIFI-4823 Made pretty printing configurable in GetMongo.

Repository: nifi
Updated Branches:
  refs/heads/master d8cfb8e6c -> 6e7dfb993


NIFI-4823 Made pretty printing configurable in GetMongo.

This closes #2441

Signed-off-by: Jeremy Dyer <je...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6e7dfb99
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6e7dfb99
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6e7dfb99

Branch: refs/heads/master
Commit: 6e7dfb9935622a5215115df93503ef4bfba75949
Parents: d8cfb8e
Author: Mike Thomsen <mi...@gmail.com>
Authored: Mon Jan 29 06:44:14 2018 -0500
Committer: Jeremy Dyer <je...@apache.org>
Committed: Wed Jan 31 11:34:31 2018 -0500

----------------------------------------------------------------------
 .../nifi/processors/mongodb/GetMongo.java       | 46 ++++++++++++++------
 .../nifi/processors/mongodb/GetMongoTest.java   | 21 +++++++++
 2 files changed, 53 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6e7dfb99/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index 8988245..5fa4550 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -19,6 +19,7 @@
 package org.apache.nifi.processors.mongodb;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
 import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
@@ -126,6 +127,19 @@ public class GetMongo extends AbstractMongoProcessor {
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
         .build();
 
+    static final AllowableValue YES_PP = new AllowableValue("true", "True");
+    static final AllowableValue NO_PP  = new AllowableValue("false", "False");
+    static final PropertyDescriptor USE_PRETTY_PRINTING = new PropertyDescriptor.Builder()
+        .name("use-pretty-printing")
+        .displayName("Pretty Print Results JSON")
+        .description("Choose whether or not to pretty print the JSON from the results of the query. " +
+                "Choosing yes can greatly increase the space requirements on disk depending on the complexity of the JSON document")
+        .required(true)
+        .defaultValue(YES_PP.getValue())
+        .allowableValues(YES_PP, NO_PP)
+        .addValidator(Validator.VALID)
+        .build();
+
     static final String JSON_TYPE_EXTENDED = "Extended";
     static final String JSON_TYPE_STANDARD   = "Standard";
     static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON",
@@ -151,6 +165,7 @@ public class GetMongo extends AbstractMongoProcessor {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.addAll(descriptors);
         _propertyDescriptors.add(JSON_TYPE);
+        _propertyDescriptors.add(USE_PRETTY_PRINTING);
         _propertyDescriptors.add(QUERY);
         _propertyDescriptors.add(PROJECTION);
         _propertyDescriptors.add(SORT);
@@ -179,13 +194,13 @@ public class GetMongo extends AbstractMongoProcessor {
     private ObjectMapper mapper;
 
     //Turn a list of Mongo result documents into a String representation of a JSON array
-    private String buildBatch(List<Document> documents, String jsonTypeSetting) throws IOException {
+    private String buildBatch(List<Document> documents, String jsonTypeSetting, String prettyPrintSetting) throws IOException {
         StringBuilder builder = new StringBuilder();
         for (int index = 0; index < documents.size(); index++) {
             Document document = documents.get(index);
             String asJson;
             if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
-                asJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(document);
+                asJson = getObjectWriter(mapper, prettyPrintSetting).writeValueAsString(document);
             } else {
                 asJson = document.toJson(new JsonWriterSettings(true));
             }
@@ -206,6 +221,11 @@ public class GetMongo extends AbstractMongoProcessor {
         }
     }
 
+    private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) {
+        return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter()
+                : mapper.writer();
+    }
+
     private void writeBatch(String payload, ProcessContext context, ProcessSession session) {
         FlowFile flowFile = session.create();
         flowFile = session.write(flowFile, new OutputStreamCallback() {
@@ -230,6 +250,7 @@ public class GetMongo extends AbstractMongoProcessor {
         final Document sort = context.getProperty(SORT).isSet()
                 ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null;
         final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
+        final String usePrettyPrint  = context.getProperty(USE_PRETTY_PRINTING).getValue();
         configureMapper(jsonTypeSetting);
 
 
@@ -265,7 +286,7 @@ public class GetMongo extends AbstractMongoProcessor {
                                 if (log.isDebugEnabled()) {
                                     log.debug("Writing batch...");
                                 }
-                                String payload = buildBatch(batch, jsonTypeSetting);
+                                String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint);
                                 writeBatch(payload, context, session);
                                 batch = new ArrayList<>();
                             } catch (IOException ex) {
@@ -275,7 +296,7 @@ public class GetMongo extends AbstractMongoProcessor {
                     }
                     if (batch.size() > 0) {
                         try {
-                            writeBatch(buildBatch(batch, jsonTypeSetting), context, session);
+                            writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), context, session);
                         } catch (IOException ex) {
                             getLogger().error("Error sending remainder of batch", ex);
                         }
@@ -283,17 +304,14 @@ public class GetMongo extends AbstractMongoProcessor {
                 } else {
                     while (cursor.hasNext()) {
                         flowFile = session.create();
-                        flowFile = session.write(flowFile, new OutputStreamCallback() {
-                            @Override
-                            public void process(OutputStream out) throws IOException {
-                                String json;
-                                if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
-                                    json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(cursor.next());
-                                } else {
-                                    json = cursor.next().toJson();
-                                }
-                                IOUtils.write(json, out);
+                        flowFile = session.write(flowFile, out -> {
+                            String json;
+                            if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
+                                json = getObjectWriter(mapper, usePrettyPrint).writeValueAsString(cursor.next());
+                            } else {
+                                json = cursor.next().toJson();
                             }
+                            IOUtils.write(json, out);
                         });
                         flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6e7dfb99/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
index 2de0c97..0e16813 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
@@ -75,6 +75,7 @@ public class GetMongoTest {
         runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
         runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
         runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
+        runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP);
 
         mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
 
@@ -259,4 +260,24 @@ public class GetMongoTest {
         Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0);
         Assert.assertEquals("Wrong mime type", results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json");
     }
+
+    @Test
+    public void testConfigurablePrettyPrint() {
+        runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD);
+        runner.setProperty(GetMongo.LIMIT, "1");
+        runner.run();
+        runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+        byte[] raw = runner.getContentAsByteArray(flowFiles.get(0));
+        String json = new String(raw);
+        Assert.assertTrue("JSON did not have new lines.", json.contains("\n"));
+        runner.clearTransferState();
+        runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.NO_PP);
+        runner.run();
+        runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
+        flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+        raw = runner.getContentAsByteArray(flowFiles.get(0));
+        json = new String(raw);
+        Assert.assertFalse("New lines detected", json.contains("\n"));
+    }
 }