You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/05/20 20:04:42 UTC
[nifi] branch main updated: NIFI-8613: Improve FlattenJson Processor
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c113960 NIFI-8613: Improve FlattenJson Processor
c113960 is described below
commit c113960b81eff7613ac6d0f6a6e3a6cba10b95f4
Author: Mohammed Nadeem <na...@gmail.com>
AuthorDate: Tue May 18 20:16:04 2021 +0530
NIFI-8613: Improve FlattenJson Processor
- Unflattening a flattened json
- Preserving primitive arrays such as strings, numbers, booleans and null in a nested json
- Logging errors when failure
- Pretty printing resulted json
This closes #5083
Signed-off-by: David Handermann <ex...@apache.org>
---
.../nifi-standard-processors/pom.xml | 2 +-
.../nifi/processors/standard/FlattenJson.java | 137 ++++++++++++-----
.../processors/standard/TestFlattenJson.groovy | 165 +++++++++++++++++++++
nifi-nar-bundles/nifi-standard-bundle/pom.xml | 4 +-
4 files changed, 266 insertions(+), 42 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 9451fba..43ce711 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -331,7 +331,7 @@
<artifactId>ParCEFone</artifactId>
</dependency>
<dependency>
- <groupId>com.github.wnameless</groupId>
+ <groupId>com.github.wnameless.json</groupId>
<artifactId>json-flattener</artifactId>
</dependency>
<dependency>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java
index 917f905..d85ea16 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FlattenJson.java
@@ -20,6 +20,9 @@ package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.wnameless.json.flattener.FlattenMode;
import com.github.wnameless.json.flattener.JsonFlattener;
+import com.github.wnameless.json.flattener.PrintMode;
+import com.github.wnameless.json.unflattener.JsonUnflattener;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -40,9 +43,10 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -54,24 +58,32 @@ import java.util.Set;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@SideEffectFree
-@Tags({"json", "flatten"})
+@Tags({"json", "flatten", "unflatten"})
@CapabilityDescription(
"Provides the user with the ability to take a nested JSON document and flatten it into a simple key/value pair " +
"document. The keys are combined at each level with a user-defined separator that defaults to '.'. " +
- "Support three kinds of flatten mode, normal, keep-arrays and dot notation for MongoDB query. " +
- "Default flatten mode is 'keep-arrays'."
+ "This Processor also allows to unflatten back the flattened json. It supports four kinds of flatten mode " +
+ "such as normal, keep-arrays, dot notation for MongoDB query and keep-primitive-arrays. Default flatten mode " +
+ "is 'keep-arrays'."
)
public class FlattenJson extends AbstractProcessor {
- static final Relationship REL_SUCCESS = new Relationship.Builder()
- .description("Successfully flattened files go to this relationship.")
- .name("success")
- .build();
- static final Relationship REL_FAILURE = new Relationship.Builder()
- .description("Files that cannot be flattened go to this relationship.")
- .name("failure")
- .build();
- static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder()
+ public static final String RETURN_TYPE_FLATTEN = "flatten";
+ public static final String RETURN_TYPE_UNFLATTEN = "unflatten";
+
+ public static final AllowableValue FLATTEN_MODE_NORMAL = new AllowableValue("normal", "normal",
+ "Flattens every objects into a single level json");
+
+ public static final AllowableValue FLATTEN_MODE_KEEP_ARRAYS = new AllowableValue("keep arrays", "keep arrays",
+ "Flattens every objects and keep arrays format");
+
+ public static final AllowableValue FLATTEN_MODE_DOT_NOTATION = new AllowableValue("dot notation", "dot notation",
+ "Conforms to MongoDB dot notation to update also nested documents");
+
+ public static final AllowableValue FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS = new AllowableValue("keep primitive arrays", "keep primitive arrays",
+ "Flattens every objects except arrays which contain only primitive types (strings, numbers, booleans and null)");
+
+ public static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder()
.name("flatten-json-separator")
.displayName("Separator")
.defaultValue(".")
@@ -101,25 +113,56 @@ public class FlattenJson extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- public static final AllowableValue FLATTEN_MODE_NORMAL = new AllowableValue("normal", "normal",
- "Flattens every objects into a single level json");
-
- public static final AllowableValue FLATTEN_MODE_KEEP_ARRAYS = new AllowableValue("keep arrays", "keep arrays",
- "Flattens every objects and keep arrays format");
-
- public static final AllowableValue FLATTEN_MODE_DOT_NOTATION = new AllowableValue("dot notation", "dot notation",
- "Conforms to MongoDB dot notation to update also nested documents");
-
public static final PropertyDescriptor FLATTEN_MODE = new PropertyDescriptor.Builder()
.name("flatten-mode")
.displayName("Flatten Mode")
- .description("Specifies how json is flattened")
+ .description("Specifies how json should be flattened/unflattened")
.defaultValue(FLATTEN_MODE_KEEP_ARRAYS.getValue())
.required(true)
- .allowableValues(FLATTEN_MODE_NORMAL, FLATTEN_MODE_KEEP_ARRAYS, FLATTEN_MODE_DOT_NOTATION)
+ .allowableValues(FLATTEN_MODE_NORMAL, FLATTEN_MODE_KEEP_ARRAYS, FLATTEN_MODE_DOT_NOTATION, FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor.Builder()
+ .name("flatten-json-return-type")
+ .displayName("Return Type")
+ .description("Specifies the desired return type of json such as flatten/unflatten")
+ .defaultValue(RETURN_TYPE_FLATTEN)
+ .required(true)
+ .allowableValues(RETURN_TYPE_FLATTEN, RETURN_TYPE_UNFLATTEN)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+ .name("flatten-json-character-set")
+ .displayName("Character Set")
+ .description("The Character Set in which file is encoded")
+ .defaultValue("UTF-8")
+ .required(true)
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ public static final PropertyDescriptor PRETTY_PRINT = new PropertyDescriptor.Builder()
+ .name("flatten-json-pretty-print-json")
+ .displayName("Pretty Print JSON")
+ .description("Specifies whether or not resulted json should be pretty printed")
+ .defaultValue("false")
+ .required(true)
+ .allowableValues("true", "false")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .description("Successfully flattened/unflattened files go to this relationship.")
+ .name("success")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .description("Files that cannot be flattened/unflattened go to this relationship.")
+ .name("failure")
+ .build();
+
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@@ -128,6 +171,9 @@ public class FlattenJson extends AbstractProcessor {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(SEPARATOR);
props.add(FLATTEN_MODE);
+ props.add(RETURN_TYPE);
+ props.add(CHARACTER_SET);
+ props.add(PRETTY_PRINT);
properties = Collections.unmodifiableList(props);
Set<Relationship> rels = new HashSet<>();
@@ -157,25 +203,36 @@ public class FlattenJson extends AbstractProcessor {
final String mode = context.getProperty(FLATTEN_MODE).getValue();
final FlattenMode flattenMode = getFlattenMode(mode);
- String separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
-
+ final Character separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(flowFile).getValue().charAt(0);
+ final String returnType = context.getProperty(RETURN_TYPE).getValue();
+ final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+ final PrintMode printMode = context.getProperty(PRETTY_PRINT).asBoolean() ? PrintMode.PRETTY : PrintMode.MINIMAL;
try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- session.exportTo(flowFile, bos);
- bos.close();
-
- String raw = new String(bos.toByteArray());
- final String flattened = new JsonFlattener(raw)
- .withFlattenMode(flattenMode)
- .withSeparator(separator.charAt(0))
- .withStringEscapePolicy(() -> StringEscapeUtils.ESCAPE_JSON)
- .flatten();
-
- flowFile = session.write(flowFile, os -> os.write(flattened.getBytes()));
+ final StringBuilder contents = new StringBuilder();
+ session.read(flowFile, in -> contents.append(IOUtils.toString(in, charset)));
+
+ final String resultedJson;
+ if (returnType.equals(RETURN_TYPE_FLATTEN)) {
+ resultedJson = new JsonFlattener(contents.toString())
+ .withFlattenMode(flattenMode)
+ .withSeparator(separator)
+ .withStringEscapePolicy(() -> StringEscapeUtils.ESCAPE_JSON)
+ .withPrintMode(printMode)
+ .flatten();
+ } else {
+ resultedJson = new JsonUnflattener(contents.toString())
+ .withFlattenMode(flattenMode)
+ .withSeparator(separator)
+ .withPrintMode(printMode)
+ .unflatten();
+ }
+
+ flowFile = session.write(flowFile, out -> out.write(resultedJson.getBytes(charset)));
session.transfer(flowFile, REL_SUCCESS);
- } catch (Exception ex) {
+ } catch (Exception e) {
+ getLogger().error("Failed to {} JSON", returnType, e);
session.transfer(flowFile, REL_FAILURE);
}
}
@@ -185,6 +242,8 @@ public class FlattenJson extends AbstractProcessor {
return FlattenMode.NORMAL;
} else if (FLATTEN_MODE_DOT_NOTATION.getValue().equals(mode)) {
return FlattenMode.MONGODB;
+ } else if (FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS.getValue().equals(mode)) {
+ return FlattenMode.KEEP_PRIMITIVE_ARRAYS;
} else {
return FlattenMode.KEEP_ARRAYS;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy
index 24604a2..ce72c84 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestFlattenJson.groovy
@@ -155,6 +155,77 @@ class TestFlattenJson {
}
@Test
+ void testFlattenModeKeepArrays() {
+ def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+ def json = prettyPrint(toJson([
+ first: [
+ second: [
+ [
+ x: 1,
+ y: 2,
+ z: [3, 4, 5]
+ ],
+ [ 6, 7, 8],
+ [
+ [9, 10],
+ 11,
+ 12
+ ]
+ ],
+ "third" : [
+ a: "b",
+ c: "d",
+ e: "f"
+ ]
+ ]
+ ]))
+
+ testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_ARRAYS)
+ baseTest(testRunner, json,4) { parsed ->
+ assert parsed["first.second"] instanceof List // [{x=1, y=2, z=[3, 4, 5]}, [6, 7, 8], [[9, 10], 11, 12]]
+ assert parsed["first.second"][1] == [6, 7, 8]
+ Assert.assertEquals("Separator not applied.", "b", parsed["first.third.a"])
+ }
+ }
+
+ @Test
+ void testFlattenModeKeepPrimitiveArrays() {
+ def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+ def json = prettyPrint(toJson([
+ first: [
+ second: [
+ [
+ x: 1,
+ y: 2,
+ z: [3, 4, 5]
+ ],
+ [ 6, 7, 8],
+ [
+ [9, 10],
+ 11,
+ 12
+ ]
+ ],
+ "third" : [
+ a: "b",
+ c: "d",
+ e: "f"
+ ]
+ ]
+ ]))
+
+ testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS)
+ baseTest(testRunner, json,10) { parsed ->
+ Assert.assertEquals("Separator not applied.", 1, parsed["first.second[0].x"])
+ Assert.assertEquals("Separator not applied.", [3, 4, 5], parsed["first.second[0].z"])
+ Assert.assertEquals("Separator not applied.", [9, 10], parsed["first.second[2][0]"])
+ Assert.assertEquals("Separator not applied.", 11, parsed["first.second[2][1]"])
+ Assert.assertEquals("Separator not applied.", 12, parsed["first.second[2][2]"])
+ Assert.assertEquals("Separator not applied.", "d", parsed["first.third.c"])
+ }
+ }
+
+ @Test
void testFlattenModeDotNotation() {
def testRunner = TestRunners.newTestRunner(FlattenJson.class)
def json = prettyPrint(toJson([
@@ -203,4 +274,98 @@ class TestFlattenJson {
Assert.assertEquals("Separator not applied.", "José", parsed["name"])
}
}
+
+ @Test
+ void testUnFlatten() {
+ def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+ def json = prettyPrint(toJson([
+ "test.msg": "Hello, world",
+ "first.second.third": [ "one", "two", "three", "four", "five" ]
+ ]))
+
+ testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
+ baseTest(testRunner, json, 2) { parsed ->
+ assert parsed.test instanceof Map
+ assert parsed.test.msg == "Hello, world"
+ assert parsed.first.second.third == [ "one", "two", "three", "four", "five" ]
+ }
+ }
+
+ @Test
+ void testUnFlattenWithDifferentSeparator() {
+ def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+ def json = prettyPrint(toJson([
+ "first_second_third": [ "one", "two", "three", "four", "five" ]
+ ]))
+
+ testRunner.setProperty(FlattenJson.SEPARATOR, "_")
+ testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
+ baseTest(testRunner, json, 1) { parsed ->
+ assert parsed.first instanceof Map
+ assert parsed.first.second.third == [ "one", "two", "three", "four", "five" ]
+ }
+ }
+
+ @Test
+ void testUnFlattenForKeepArraysMode() {
+ def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+ def json = prettyPrint(toJson([
+ "a.b": 1,
+ "a.c": [
+ false,
+ ["i.j": [ false, true, "xy" ] ]
+ ]
+ ]))
+
+ testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_ARRAYS)
+ testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
+ baseTest(testRunner, json, 1) { parsed ->
+ assert parsed.a instanceof Map
+ assert parsed.a.b == 1
+ assert parsed.a.c[0] == false
+ assert parsed.a.c[1].i instanceof Map
+ assert parsed.a.c[1].i.j == [false, true, "xy"]
+ }
+ }
+
+ @Test
+ void testUnFlattenForKeepPrimitiveArraysMode() {
+ def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+ def json = prettyPrint(toJson([
+ "first.second[0].x": 1,
+ "first.second[0].y": 2,
+ "first.second[0].z": [3, 4, 5],
+ "first.second[1]": [6, 7, 8],
+ "first.second[2][0]": [9, 10],
+ "first.second[2][1]": 11,
+ "first.second[2][2]": 12,
+ "first.third.a": "b",
+ "first.third.c": "d",
+ "first.third.e": "f"
+ ]))
+
+ testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_KEEP_PRIMITIVE_ARRAYS)
+ testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
+ baseTest(testRunner, json, 1) { parsed ->
+ assert parsed.first instanceof Map
+ assert parsed.first.second[0].x == 1
+ assert parsed.first.second[2][0] == [9, 10]
+ assert parsed.first.third.c == "d"
+ }
+ }
+
+ @Test
+ void testUnFlattenForDotNotationMode() {
+ def testRunner = TestRunners.newTestRunner(FlattenJson.class)
+ def json = prettyPrint(toJson([
+ "first.second.third.0": ["one", "two", "three", "four", "five"]
+ ]))
+
+ testRunner.setProperty(FlattenJson.FLATTEN_MODE, FlattenJson.FLATTEN_MODE_DOT_NOTATION)
+ testRunner.setProperty(FlattenJson.RETURN_TYPE, FlattenJson.RETURN_TYPE_UNFLATTEN)
+ baseTest(testRunner, json,1) { parsed ->
+ assert parsed.first instanceof Map
+ assert parsed.first.second.third[0] == ["one", "two", "three", "four", "five"]
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
index a55771f..670eb92 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
@@ -305,9 +305,9 @@
<version>1.2.8</version>
</dependency>
<dependency>
- <groupId>com.github.wnameless</groupId>
+ <groupId>com.github.wnameless.json</groupId>
<artifactId>json-flattener</artifactId>
- <version>0.7.1</version>
+ <version>0.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.bval</groupId>