You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/05/20 20:04:42 UTC

[nifi] branch main updated: NIFI-8613: Improve FlattenJson Processor

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c113960  NIFI-8613: Improve FlattenJson Processor
c113960 is described below

commit c113960b81eff7613ac6d0f6a6e3a6cba10b95f4
Author: Mohammed Nadeem <na...@gmail.com>
AuthorDate: Tue May 18 20:16:04 2021 +0530

    NIFI-8613: Improve FlattenJson Processor
    
    - Unflattening a flattened json
    - Preserving primitive arrays such as strings, numbers, booleans and null in a nested json
    - Logging errors when failure
    - Pretty printing resulted json
    
    This closes #5083
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../nifi-standard-processors/pom.xml               |   2 +-
 .../nifi/processors/standard/FlattenJson.java      | 137 ++++++++++++-----
 .../processors/standard/TestFlattenJson.groovy     | 165 +++++++++++++++++++++
 nifi-nar-bundles/nifi-standard-bundle/pom.xml      |   4 +-
 4 files changed, 266 insertions(+), 42 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 9451fba..43ce711 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -331,7 +331,7 @@
             <artifactId>ParCEFone</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.github.wnameless</groupId>
+            <groupId>com.github.wnameless.json</groupId>
             <artifactId>json-flattener</artifactId>
         </dependency>
         <dependency>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java
index 917f905..d85ea16 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java
@@ -20,6 +20,9 @@ package org.apache.nifi.processors.standard;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.wnameless.json.flattener.FlattenMode;
 import com.github.wnameless.json.flattener.JsonFlattener;
+import com.github.wnameless.json.flattener.PrintMode;
+import com.github.wnameless.json.unflattener.JsonUnflattener;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.text.StringEscapeUtils;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -40,9 +43,10 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -54,24 +58,32 @@ import java.util.Set;
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @SideEffectFree
-@Tags({"json", "flatten"})
+@Tags({"json", "flatten", "unflatten"})
 @CapabilityDescription(
         "Provides the user with the ability to take a nested JSON document and flatten it into a simple key/value pair " +
                 "document. The keys are combined at each level with a user-defined separator that defaults to '.'. " +
-                "Support three kinds of flatten mode, normal, keep-arrays and dot notation for MongoDB query. " +
-                "Default flatten mode is 'keep-arrays'."
+                "This Processor also allows to unflatten back the flattened json. It supports four kinds of flatten mode " +
+                "such as normal, keep-arrays, dot notation for MongoDB query and keep-primitive-arrays. Default flatten mode " +
+                "is 'keep-arrays'."
 )
 public class FlattenJson extends AbstractProcessor {
-    static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .description("Successfully flattened files go to this relationship.")
-            .name("success")
-            .build();
-    static final Relationship REL_FAILURE = new Relationship.Builder()
-            .description("Files that cannot be flattened go to this relationship.")
-            .name("failure")
-            .build();
 
-    static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder()
+    public static final String RETURN_TYPE_FLATTEN = "flatten";
+    public static final String RETURN_TYPE_UNFLATTEN = "unflatten";
+
+    public static final AllowableValue FLATTEN_MODE_NORMAL = new AllowableValue("normal", "normal",
+            "Flattens every objects into a single level json");
+
+    public static final AllowableValue FLATTEN_MODE_KEEP_ARRAYS = new AllowableValue("keep arrays", "keep arrays",
+            "Flattens every objects and keep arrays format");
+
+    public static final AllowableValue FLATTEN_MODE_DOT_NOTATION = new AllowableValue("dot notation", "dot notation",
+            "Conforms to MongoDB dot notation to update also nested documents");
+
+    public static final AllowableValue FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS = new AllowableValue("keep primitive arrays", "keep primitive arrays",
+            "Flattens every objects except arrays which contain only primitive types (strings, numbers, booleans and null)");
+
+    public static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder()
             .name("flatten-json-separator")
             .displayName("Separator")
             .defaultValue(".")
@@ -101,25 +113,56 @@ public class FlattenJson extends AbstractProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
-    public static final AllowableValue FLATTEN_MODE_NORMAL = new AllowableValue("normal", "normal",
-            "Flattens every objects into a single level json");
-
-    public static final AllowableValue FLATTEN_MODE_KEEP_ARRAYS = new AllowableValue("keep arrays", "keep arrays",
-            "Flattens every objects and keep arrays format");
-
-    public static final AllowableValue FLATTEN_MODE_DOT_NOTATION = new AllowableValue("dot notation", "dot notation",
-            "Conforms to MongoDB dot notation to update also nested documents");
-
     public static final PropertyDescriptor FLATTEN_MODE = new PropertyDescriptor.Builder()
             .name("flatten-mode")
             .displayName("Flatten Mode")
-            .description("Specifies how json is flattened")
+            .description("Specifies how json should be flattened/unflattened")
             .defaultValue(FLATTEN_MODE_KEEP_ARRAYS.getValue())
             .required(true)
-            .allowableValues(FLATTEN_MODE_NORMAL, FLATTEN_MODE_KEEP_ARRAYS, FLATTEN_MODE_DOT_NOTATION)
+            .allowableValues(FLATTEN_MODE_NORMAL, FLATTEN_MODE_KEEP_ARRAYS, FLATTEN_MODE_DOT_NOTATION, FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor.Builder()
+            .name("flatten-json-return-type")
+            .displayName("Return Type")
+            .description("Specifies the desired return type of json such as flatten/unflatten")
+            .defaultValue(RETURN_TYPE_FLATTEN)
+            .required(true)
+            .allowableValues(RETURN_TYPE_FLATTEN, RETURN_TYPE_UNFLATTEN)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+            .name("flatten-json-character-set")
+            .displayName("Character Set")
+            .description("The Character Set in which file is encoded")
+            .defaultValue("UTF-8")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+            .name("flatten-json-pretty-print-json")
+            .displayName("Pretty Print JSON")
+            .description("Specifies whether or not resulted json should be pretty printed")
+            .defaultValue("false")
+            .required(true)
+            .allowableValues("true", "false")
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .description("Successfully flattened/unflattened files go to this relationship.")
+            .name("success")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .description("Files that cannot be flattened/unflattened go to this relationship.")
+            .name("failure")
+            .build();
+
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
 
@@ -128,6 +171,9 @@ public class FlattenJson extends AbstractProcessor {
         List<PropertyDescriptor> props = new ArrayList<>();
         props.add(SEPARATOR);
         props.add(FLATTEN_MODE);
+        props.add(RETURN_TYPE);
+        props.add(CHARACTER_SET);
+        props.add(PRETTY_PRINT);
         properties = Collections.unmodifiableList(props);
 
         Set<Relationship> rels = new HashSet<>();
@@ -157,25 +203,36 @@ public class FlattenJson extends AbstractProcessor {
         final String mode = context.getProperty(FLATTEN_MODE).getValue();
         final FlattenMode flattenMode = getFlattenMode(mode);
 
-        String separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
-
+        final Character separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(flowFile).getValue().charAt(0);
+        final String returnType = context.getProperty(RETURN_TYPE).getValue();
+        final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+        final PrintMode printMode = context.getProperty(PRETTY_PRINT).asBoolean() ? PrintMode.PRETTY : PrintMode.MINIMAL;
 
         try {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            session.exportTo(flowFile, bos);
-            bos.close();
-
-            String raw = new String(bos.toByteArray());
-            final String flattened = new JsonFlattener(raw)
-                    .withFlattenMode(flattenMode)
-                    .withSeparator(separator.charAt(0))
-                    .withStringEscapePolicy(() -> StringEscapeUtils.ESCAPE_JSON)
-                    .flatten();
-
-            flowFile = session.write(flowFile, os -> os.write(flattened.getBytes()));
+            final StringBuilder contents = new StringBuilder();
+            session.read(flowFile, in -> contents.append(IOUtils.toString(in, charset)));
+
+            final String resultedJson;
+            if (returnType.equals(RETURN_TYPE_FLATTEN)) {
+                resultedJson = new JsonFlattener(contents.toString())
+                        .withFlattenMode(flattenMode)
+                        .withSeparator(separator)
+                        .withStringEscapePolicy(() -> StringEscapeUtils.ESCAPE_JSON)
+                        .withPrintMode(printMode)
+                        .flatten();
+            } else {
+                resultedJson = new JsonUnflattener(contents.toString())
+                        .withFlattenMode(flattenMode)
+                        .withSeparator(separator)
+                        .withPrintMode(printMode)
+                        .unflatten();
+            }
+
+            flowFile = session.write(flowFile, out -> out.write(resultedJson.getBytes(charset)));
 
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (Exception ex) {
+        } catch (Exception e) {
+            getLogger().error("Failed to {} JSON", returnType, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
@@ -185,6 +242,8 @@ public class FlattenJson extends AbstractProcessor {
             return FlattenMode.NORMAL;
         } else if (FLATTEN_MODE_DOT_NOTATION.getValue().equals(mode)) {
             return FlattenMode.MONGODB;
+        } else if (FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS.getValue().equals(mode)) {
+            return FlattenMode.KEEP_PRIMITIVE_ARRAYS;
         } else {
             return FlattenMode.KEEP_ARRAYS;
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy
index 24604a2..ce72c84 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy
@@ -155,6 +155,77 @@ class TestFlattenJson {
     }
 
     @Test
+    void testFlattenModeKeepArrays() {
+        def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+        def json = prettyPrint(toJson([
+                first: [
+                        second: [
+                                [
+                                        x: 1,
+                                        y: 2,
+                                        z: [3, 4, 5]
+                                ],
+                                [ 6, 7, 8],
+                                [
+                                        [9, 10],
+                                        11,
+                                        12
+                                ]
+                        ],
+                        "third" : [
+                                a: "b",
+                                c: "d",
+                                e: "f"
+                        ]
+                ]
+        ]))
+
+        testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_ARRAYS)
+        baseTest(testRunner, json,4) { parsed ->
+            assert parsed["first.second"] instanceof List   // [{x=1, y=2, z=[3, 4, 5]}, [6, 7, 8], [[9, 10], 11, 12]]
+            assert parsed["first.second"][1] == [6, 7, 8]
+            Assert.assertEquals("Separator not applied.", "b", parsed["first.third.a"])
+        }
+    }
+
+    @Test
+    void testFlattenModeKeepPrimitiveArrays() {
+        def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+        def json = prettyPrint(toJson([
+                first: [
+                        second: [
+                                [
+                                        x: 1,
+                                        y: 2,
+                                        z: [3, 4, 5]
+                                ],
+                                [ 6, 7, 8],
+                                [
+                                        [9, 10],
+                                        11,
+                                        12
+                                ]
+                        ],
+                        "third" : [
+                                a: "b",
+                                c: "d",
+                                e: "f"
+                        ]
+                ]
+        ]))
+
+        testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS)
+        baseTest(testRunner, json,10) { parsed ->
+            Assert.assertEquals("Separator not applied.", 1, parsed["first.second[0].x"])
+            Assert.assertEquals("Separator not applied.", [3, 4, 5], parsed["first.second[0].z"])
+            Assert.assertEquals("Separator not applied.", [9, 10], parsed["first.second[2][0]"])
+            Assert.assertEquals("Separator not applied.", 11, parsed["first.second[2][1]"])
+            Assert.assertEquals("Separator not applied.", 12, parsed["first.second[2][2]"])
+            Assert.assertEquals("Separator not applied.", "d", parsed["first.third.c"])
+        }
+    }
+
+    @Test
     void testFlattenModeDotNotation() {
         def testRunner = TestRunners.newTestRunner(FlattenJson.class)
         def json = prettyPrint(toJson([
@@ -203,4 +274,98 @@ class TestFlattenJson {
             Assert.assertEquals("Separator not applied.", "José", parsed["name"])
         }
     }
+
+    @Test
+    void testUnFlatten() {
+        def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+        def json = prettyPrint(toJson([
+                "test.msg": "Hello, world",
+                "first.second.third":  [ "one", "two", "three", "four", "five" ]
+        ]))
+
+        testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
+        baseTest(testRunner, json, 2) { parsed ->
+            assert parsed.test instanceof Map
+            assert parsed.test.msg == "Hello, world"
+            assert parsed.first.second.third == [ "one", "two", "three", "four", "five" ]
+        }
+    }
+
+    @Test
+    void testUnFlattenWithDifferentSeparator() {
+        def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+        def json = prettyPrint(toJson([
+                "first_second_third":  [ "one", "two", "three", "four", "five" ]
+        ]))
+
+        testRunner.setProperty(FlattenJson.SEPARATOR, "_")
+        testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
+        baseTest(testRunner, json, 1) { parsed ->
+            assert parsed.first instanceof Map
+            assert parsed.first.second.third == [ "one", "two", "three", "four", "five" ]
+        }
+    }
+
+    @Test
+    void testUnFlattenForKeepArraysMode() {
+        def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+        def json = prettyPrint(toJson([
+                "a.b": 1,
+                "a.c": [
+                        false,
+                        ["i.j": [ false, true, "xy" ] ]
+                ]
+        ]))
+
+        testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_ARRAYS)
+        testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
+        baseTest(testRunner, json, 1) { parsed ->
+            assert parsed.a instanceof Map
+            assert parsed.a.b == 1
+            assert parsed.a.c[0] == false
+            assert parsed.a.c[1].i instanceof Map
+            assert parsed.a.c[1].i.j == [false, true, "xy"]
+        }
+    }
+
+    @Test
+    void testUnFlattenForKeepPrimitiveArraysMode() {
+        def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+        def json = prettyPrint(toJson([
+                "first.second[0].x": 1,
+                "first.second[0].y": 2,
+                "first.second[0].z": [3, 4, 5],
+                "first.second[1]": [6, 7, 8],
+                "first.second[2][0]": [9, 10],
+                "first.second[2][1]": 11,
+                "first.second[2][2]": 12,
+                "first.third.a": "b",
+                "first.third.c": "d",
+                "first.third.e": "f"
+        ]))
+
+        testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS)
+        testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
+        baseTest(testRunner, json, 1) { parsed ->
+            assert parsed.first instanceof Map
+            assert parsed.first.second[0].x == 1
+            assert parsed.first.second[2][0] == [9, 10]
+            assert parsed.first.third.c == "d"
+        }
+    }
+
+    @Test
+    void testUnFlattenForDotNotationMode() {
+        def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+        def json = prettyPrint(toJson([
+                "first.second.third.0": ["one", "two", "three", "four", "five"]
+        ]))
+
+        testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_DOT_NOTATION)
+        testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
+        baseTest(testRunner, json,1) { parsed ->
+            assert parsed.first instanceof Map
+            assert parsed.first.second.third[0] == ["one", "two", "three", "four", "five"]
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
index a55771f..670eb92 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
@@ -305,9 +305,9 @@
                 <version>1.2.8</version>
             </dependency>
             <dependency>
-                <groupId>com.github.wnameless</groupId>
+                <groupId>com.github.wnameless.json</groupId>
                 <artifactId>json-flattener</artifactId>
-                <version>0.7.1</version>
+                <version>0.12.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.bval</groupId>