You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ch...@apache.org on 2022/11/22 21:17:41 UTC
[nifi] 01/01: NIFI-10865 allow RecordPath's unescapeJson to convert de-serialised JSON Objects into Records
This is an automated email from the ASF dual-hosted git repository.
chriss pushed a commit to branch NIFI-10865
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit bf5fdaf5022e6f4f547fc782c1ee6fbde08fe108
Author: Chris Sampson <ch...@naimuri.com>
AuthorDate: Tue Nov 22 11:44:53 2022 +0000
NIFI-10865 allow RecordPath's unescapeJson to convert de-serialised JSON Objects into Records
NIFI-10865 allow UpdateRecord to replace the Record root for relative paths, e.g. when a RecordPath function is used to modify selected field(s)
---
.../nifi/record/path/functions/UnescapeJson.java | 35 ++++-
.../nifi/record/path/paths/RecordPathCompiler.java | 11 +-
.../apache/nifi/record/path/TestRecordPath.java | 145 ++++++++++++---------
nifi-docs/src/main/asciidoc/record-path-guide.adoc | 22 +++-
.../nifi/processors/standard/UpdateRecord.java | 29 +++--
.../nifi/processors/standard/TestUpdateRecord.java | 59 +++++++++
.../input/person-stringified-name.json | 4 +
.../TestUpdateRecord/output/person-with-name.json | 7 +
.../schema/person-with-stringified-name.avsc | 9 ++
9 files changed, 248 insertions(+), 73 deletions(-)
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java
index 6a18320fe8..35f7d93d3f 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java
@@ -23,7 +23,10 @@ import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
@@ -31,21 +34,29 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Map;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
public class UnescapeJson extends RecordPathSegment {
private final RecordPathSegment recordPath;
+ private final RecordPathSegment convertToRecordRecordPath;
+
private final ObjectMapper objectMapper = new ObjectMapper();
- public UnescapeJson(final RecordPathSegment recordPath, final boolean absolute) {
+ public UnescapeJson(final RecordPathSegment recordPath, final RecordPathSegment convertToRecordRecordPath, final boolean absolute) {
super("unescapeJson", null, absolute);
this.recordPath = recordPath;
+ this.convertToRecordRecordPath = convertToRecordRecordPath;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final boolean convertMapToRecord = convertToRecordRecordPath != null
+ && Boolean.parseBoolean(RecordPathUtils.getFirstStringValue(convertToRecordRecordPath, context));
+
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
@@ -58,7 +69,10 @@ public class UnescapeJson extends RecordPathSegment {
dataType = DataTypeUtils.chooseDataType(value, (ChoiceDataType) fv.getField().getDataType());
}
- return new StandardFieldValue(convertFieldValue(value, fv.getField().getFieldName(), dataType), fv.getField(), fv.getParent().orElse(null));
+ return new StandardFieldValue(
+ convertFieldValue(value, fv.getField().getFieldName(), dataType, convertMapToRecord),
+ fv.getField(), fv.getParent().orElse(null)
+ );
} catch (IOException e) {
throw new RecordPathException("Unable to deserialise JSON String into Record Path value", e);
}
@@ -69,7 +83,7 @@ public class UnescapeJson extends RecordPathSegment {
}
@SuppressWarnings("unchecked")
- private Object convertFieldValue(final Object value, final String fieldName, final DataType dataType) throws IOException {
+ private Object convertFieldValue(final Object value, final String fieldName, final DataType dataType, final boolean convertMapToRecord) throws IOException {
if (dataType instanceof RecordDataType) {
// convert Maps to Records
final Map<String, Object> map = objectMapper.readValue(value.toString(), Map.class);
@@ -85,7 +99,20 @@ public class UnescapeJson extends RecordPathSegment {
return arr;
} else {
// generic conversion for simpler fields
- return objectMapper.readValue(value.toString(), Object.class);
+ final Object parsed = objectMapper.readValue(value.toString(), Object.class);
+ if (convertMapToRecord) {
+ if (DataTypeUtils.isCompatibleDataType(parsed, RecordFieldType.RECORD.getDataType())) {
+ return DataTypeUtils.toRecord(parsed, fieldName);
+ } else if (DataTypeUtils.isArrayTypeCompatible(parsed, RecordFieldType.RECORD.getDataType())) {
+ return Arrays.stream((Object[]) parsed).map(m -> DataTypeUtils.toRecord(m, fieldName)).toArray(Record[]::new);
+ } else if (parsed instanceof Collection
+ && !((Collection<Object>) parsed).isEmpty()
+ && DataTypeUtils.isCompatibleDataType(((Collection<Object>) parsed).stream().findFirst().get(), RecordFieldType.RECORD.getDataType())) {
+ return ((Collection<Object>) parsed).stream().map(m -> DataTypeUtils.toRecord(m, fieldName)).collect(Collectors.toList());
+ }
+ }
+
+ return parsed;
}
}
}
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index d966382569..b51e068323 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -328,8 +328,15 @@ public class RecordPathCompiler {
return new EscapeJson(args[0], absolute);
}
case "unescapeJson": {
- final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
- return new UnescapeJson(args[0], absolute);
+ final int numArgs = argumentListTree.getChildCount();
+
+ if (numArgs == 1) {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
+ return new UnescapeJson(args[0], null, absolute);
+ } else {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new UnescapeJson(args[0], args[1], absolute);
+ }
}
case "hash":{
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
index db3b13b786..264ae09f9e 100644
--- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
+++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -53,13 +53,14 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("OptionalGetWithoutIsPresent")
public class TestRecordPath {
private static final String USER_TIMEZONE_PROPERTY = "user.timezone";
@@ -280,7 +281,7 @@ public class TestRecordPath {
final Record record = new MapRecord(schema, values);
final FieldValue fieldValue = RecordPath.compile("/attributes['city']").evaluate(record).getSelectedFields().findFirst().get();
- assertTrue(fieldValue.getField().getFieldName().equals("attributes"));
+ assertEquals("attributes", fieldValue.getField().getFieldName());
assertEquals("New York", fieldValue.getValue());
assertEquals(record, fieldValue.getParentRecord().get());
}
@@ -300,7 +301,7 @@ public class TestRecordPath {
final Record record = new MapRecord(schema, values);
final FieldValue fieldValue = RecordPath.compile("/attributes/.['city']").evaluate(record).getSelectedFields().findFirst().get();
- assertTrue(fieldValue.getField().getFieldName().equals("attributes"));
+ assertEquals("attributes", fieldValue.getField().getFieldName());
assertEquals("New York", fieldValue.getValue());
assertEquals(record, fieldValue.getParentRecord().get());
}
@@ -1094,24 +1095,24 @@ public class TestRecordPath {
// Special character cases
values.put("name", "John Doe");
- assertEquals("Replacing whitespace to new line",
- "John\nDoe", RecordPath.compile("replaceRegex(/name, '[\\s]', '\\n')")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("John\nDoe", RecordPath.compile("replaceRegex(/name, '[\\s]', '\\n')")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Replacing whitespace to new line");
values.put("name", "John\nDoe");
- assertEquals("Replacing new line to whitespace",
- "John Doe", RecordPath.compile("replaceRegex(/name, '\\n', ' ')")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("John Doe", RecordPath.compile("replaceRegex(/name, '\\n', ' ')")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Replacing new line to whitespace");
values.put("name", "John Doe");
- assertEquals("Replacing whitespace to tab",
- "John\tDoe", RecordPath.compile("replaceRegex(/name, '[\\s]', '\\t')")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("John\tDoe", RecordPath.compile("replaceRegex(/name, '[\\s]', '\\t')")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Replacing whitespace to tab");
values.put("name", "John\tDoe");
- assertEquals("Replacing tab to whitespace",
- "John Doe", RecordPath.compile("replaceRegex(/name, '\\t', ' ')")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("John Doe", RecordPath.compile("replaceRegex(/name, '\\t', ' ')")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Replacing tab to whitespace");
}
@@ -1129,27 +1130,27 @@ public class TestRecordPath {
final Record record = new MapRecord(schema, values);
// Quotes
- // NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't to do so at NiFi UI.
+ // NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't do so at NiFi UI.
// The test record path is equivalent to replaceRegex(/name, '\'', '"')
values.put("name", "'John' 'Doe'");
- assertEquals("Replacing quote to double-quote",
- "\"John\" \"Doe\"", RecordPath.compile("replaceRegex(/name, '\\'', '\"')")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("\"John\" \"Doe\"", RecordPath.compile("replaceRegex(/name, '\\'', '\"')")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Replacing quote to double-quote");
values.put("name", "\"John\" \"Doe\"");
- assertEquals("Replacing double-quote to single-quote",
- "'John' 'Doe'", RecordPath.compile("replaceRegex(/name, '\"', '\\'')")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("'John' 'Doe'", RecordPath.compile("replaceRegex(/name, '\"', '\\'')")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Replacing double-quote to single-quote");
values.put("name", "'John' 'Doe'");
- assertEquals("Replacing quote to double-quote, the function arguments are wrapped by double-quote",
- "\"John\" \"Doe\"", RecordPath.compile("replaceRegex(/name, \"'\", \"\\\"\")")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("\"John\" \"Doe\"", RecordPath.compile("replaceRegex(/name, \"'\", \"\\\"\")")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Replacing quote to double-quote, the function arguments are wrapped by double-quote");
values.put("name", "\"John\" \"Doe\"");
- assertEquals("Replacing double-quote to single-quote, the function arguments are wrapped by double-quote",
- "'John' 'Doe'", RecordPath.compile("replaceRegex(/name, \"\\\"\", \"'\")")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("'John' 'Doe'", RecordPath.compile("replaceRegex(/name, \"\\\"\", \"'\")")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Replacing double-quote to single-quote, the function arguments are wrapped by double-quote");
}
@@ -1167,17 +1168,17 @@ public class TestRecordPath {
final Record record = new MapRecord(schema, values);
// Back-slash
- // NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't to do so at NiFi UI.
+ // NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't do so at NiFi UI.
// The test record path is equivalent to replaceRegex(/name, '\\', '/')
values.put("name", "John\\Doe");
- assertEquals("Replacing a back-slash to forward-slash",
- "John/Doe", RecordPath.compile("replaceRegex(/name, '\\\\', '/')")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("John/Doe", RecordPath.compile("replaceRegex(/name, '\\\\', '/')")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Replacing a back-slash to forward-slash");
values.put("name", "John/Doe");
- assertEquals("Replacing a forward-slash to back-slash",
- "John\\Doe", RecordPath.compile("replaceRegex(/name, '/', '\\\\')")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("John\\Doe", RecordPath.compile("replaceRegex(/name, '/', '\\\\')")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Replacing a forward-slash to back-slash");
}
@@ -1196,14 +1197,14 @@ public class TestRecordPath {
// Brackets
values.put("name", "J[o]hn Do[e]");
- assertEquals("Square brackets can be escaped with back-slash",
- "J(o)hn Do(e)", RecordPath.compile("replaceRegex(replaceRegex(/name, '\\[', '('), '\\]', ')')")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("J(o)hn Do(e)", RecordPath.compile("replaceRegex(replaceRegex(/name, '\\[', '('), '\\]', ')')")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Square brackets can be escaped with back-slash");
values.put("name", "J(o)hn Do(e)");
- assertEquals("Brackets can be escaped with back-slash",
- "J[o]hn Do[e]", RecordPath.compile("replaceRegex(replaceRegex(/name, '\\(', '['), '\\)', ']')")
- .evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertEquals("J[o]hn Do[e]", RecordPath.compile("replaceRegex(replaceRegex(/name, '\\(', '['), '\\)', ']')")
+ .evaluate(record).getSelectedFields().findFirst().get().getValue(),
+ "Brackets can be escaped with back-slash");
}
@Test
@@ -1640,8 +1641,8 @@ public class TestRecordPath {
RecordPath.compile("base64Encode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8)),
RecordPath.compile("base64Encode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
- assertTrue(Arrays.equals(Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)),
- (byte[]) RecordPath.compile("base64Encode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue()));
+ assertArrayEquals(Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)),
+ (byte[]) RecordPath.compile("base64Encode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue());
List<Object> actualValues = RecordPath.compile("base64Encode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
IntStream.range(0, 3).forEach(i -> {
Object expectedObject = expectedValues.get(i);
@@ -1649,7 +1650,7 @@ public class TestRecordPath {
if (actualObject instanceof String) {
assertEquals(expectedObject, actualObject);
} else if (actualObject instanceof byte[]) {
- assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) actualObject));
+ assertArrayEquals((byte[]) expectedObject, (byte[]) actualObject);
}
});
}
@@ -1671,7 +1672,7 @@ public class TestRecordPath {
assertEquals("John", RecordPath.compile("base64Decode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("Doe", RecordPath.compile("base64Decode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
- assertTrue(Arrays.equals("xyz".getBytes(StandardCharsets.UTF_8), (byte[]) RecordPath.compile("base64Decode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue()));
+ assertArrayEquals("xyz".getBytes(StandardCharsets.UTF_8), (byte[]) RecordPath.compile("base64Decode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue());
List<Object> actualValues = RecordPath.compile("base64Decode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
IntStream.range(0, 3).forEach(i -> {
Object expectedObject = expectedValues.get(i);
@@ -1679,7 +1680,7 @@ public class TestRecordPath {
if (actualObject instanceof String) {
assertEquals(expectedObject, actualObject);
} else if (actualObject instanceof byte[]) {
- assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) actualObject));
+ assertArrayEquals((byte[]) expectedObject, (byte[]) actualObject);
}
});
}
@@ -1744,8 +1745,8 @@ public class TestRecordPath {
new RecordField("json_str", RecordFieldType.STRING.getDataType())
));
- // test CHOICE resulting in nested ARRAY of RECORDs
- final Record recordAddressesArray = new MapRecord(schema,
+ // test CHOICE resulting in nested ARRAY of Records
+ final Record mapAddressesArray = new MapRecord(schema,
Collections.singletonMap(
"json_str",
"{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":[{\"address_1\":\"123 Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]}")
@@ -1760,11 +1761,11 @@ public class TestRecordPath {
Collections.singletonMap("address_1", "456 Anywhere Road")
));
}},
- RecordPath.compile("unescapeJson(/json_str)").evaluate(recordAddressesArray).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
+ RecordPath.compile("unescapeJson(/json_str)").evaluate(mapAddressesArray).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
);
// test CHOICE resulting in nested single RECORD
- final Record recordAddressesSingle = new MapRecord(schema,
+ final Record mapAddressesSingle = new MapRecord(schema,
Collections.singletonMap(
"json_str",
"{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":{\"address_1\":\"123 Somewhere Street\"}}")
@@ -1776,7 +1777,35 @@ public class TestRecordPath {
put("nicknames", Arrays.asList("J", "Johnny"));
put("addresses", Collections.singletonMap("address_1", "123 Somewhere Street"));
}},
- RecordPath.compile("unescapeJson(/json_str)").evaluate(recordAddressesSingle).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
+ RecordPath.compile("unescapeJson(/json_str, 'false')").evaluate(mapAddressesSingle).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
+ );
+
+ // test single Record converted from Map Object
+ final Record recordFromMap = new MapRecord(schema,
+ Collections.singletonMap(
+ "json_str",
+ "{\"firstName\":\"John\",\"age\":30}")
+ );
+ assertEquals(
+ DataTypeUtils.toRecord(new HashMap<String, Object>(){{
+ put("firstName", "John");
+ put("age", 30);
+ }}, "json_str"),
+ RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
+ );
+
+ // test collection of Record converted from Map collection
+ final Record recordCollectionFromMaps = new MapRecord(schema,
+ Collections.singletonMap(
+ "json_str",
+ "[{\"address_1\":\"123 Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]")
+ );
+ assertEquals(
+ Arrays.asList(
+ DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Somewhere Street"), "json_str"),
+ DataTypeUtils.toRecord(Collections.singletonMap("address_1", "456 Anywhere Road"), "json_str")
+ ),
+ RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordCollectionFromMaps).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
);
// test simple String field
@@ -1870,7 +1899,7 @@ public class TestRecordPath {
assertEquals("MyString", RecordPath.compile("padRight(/someString, 3)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("MyString", RecordPath.compile("padRight(/someString, -10)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("@@@@@@@@@@", RecordPath.compile("padRight(/emptyString, 10, '@')").evaluate(record).getSelectedFields().findFirst().get().getValue());
- assertNull(null, RecordPath.compile("padRight(/nullString, 10, '@')").evaluate(record).getSelectedFields().findFirst().get().getValue());
+ assertNull(RecordPath.compile("padRight(/nullString, 10, '@')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("MyStringxy", RecordPath.compile("padRight(/someString, 10, \"xy\")").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("MyStringaV", RecordPath.compile("padRight(/someString, 10, \"aVeryLongPadding\")").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("MyStringfewfewfewfew", RecordPath.compile("padRight(/someString, 20, \"few\")").evaluate(record).getSelectedFields().findFirst().get().getValue());
diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
index b59f78ade6..0eea95f193 100644
--- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
@@ -893,7 +893,10 @@ The following record path expression would convert the record into an escaped JS
=== unescapeJson
-Converts a stringified JSON element to a Record, Array or simple field (e.g. String), using the UTF-8 character set. For example, given a schema such as:
+Converts a stringified JSON element to a Record, Array or simple field (e.g. String), using the UTF-8 character set.
+Optionally convert JSON Objects parsed as Maps into Records (defaults to false).
+
+For example, given a schema such as:
----
{
@@ -927,6 +930,23 @@ The following record path expression would populate the record with unescaped JS
Given a record such as:
+----
+{
+ "json_str": "{\"name\":\"John\",\"age\":30}"
+}
+----
+
+The following record path expression would return:
+
+|==========================================================
+| RecordPath | Return value
+| `unescapeJson(/json_str, 'true')` | {"name": "John", "age": 30} (as a Record)
+| `unescapeJson(/json_str, 'false')` | {"name"="John", "age"=30} (as a Map)
+| `unescapeJson(/json_str)` | {"name"="John", "age"=30} (as a Map)
+|==========================================================
+
+Given a record such as:
+
----
{
"json_str": "\"John\""
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
index 21581f4b54..0cd26a58b0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
@@ -201,32 +201,40 @@ public class UpdateRecord extends AbstractRecordProcessor {
}
private Record processAbsolutePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, final Record record) {
- final RecordPathResult replacementResult = replacementRecordPath.evaluate(record);
- final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList());
+ final List<FieldValue> selectedFields = getSelectedFields(replacementRecordPath, null, record);
final List<FieldValue> destinationFieldValues = destinationFields.collect(Collectors.toList());
return updateRecord(destinationFieldValues, selectedFields, record);
}
+ private boolean isReplacingRoot(final List<FieldValue> destinationFields) {
+ return destinationFields.size() == 1 && !destinationFields.get(0).getParentRecord().isPresent();
+ }
+
private Record processRelativePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, Record record) {
final List<FieldValue> destinationFieldValues = destinationFields.collect(Collectors.toList());
- for (final FieldValue fieldVal : destinationFieldValues) {
- final RecordPathResult replacementResult = replacementRecordPath.evaluate(record, fieldVal);
- final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList());
- final Object replacementObject = getReplacementObject(selectedFields);
- updateFieldValue(fieldVal, replacementObject);
+ if (isReplacingRoot(destinationFieldValues)) {
+ final List<FieldValue> selectedFields = getSelectedFields(replacementRecordPath, destinationFieldValues.get(0), record);
+ record = updateRecord(destinationFieldValues, selectedFields, record);
+ } else {
+ for (final FieldValue fieldVal : destinationFieldValues) {
+ final List<FieldValue> selectedFields = getSelectedFields(replacementRecordPath, fieldVal, record);
+ final Object replacementObject = getReplacementObject(selectedFields);
+ updateFieldValue(fieldVal, replacementObject);
+ }
}
return record;
}
private Record updateRecord(final List<FieldValue> destinationFields, final List<FieldValue> selectedFields, final Record record) {
- if (destinationFields.size() == 1 && !destinationFields.get(0).getParentRecord().isPresent()) {
+ if (isReplacingRoot(destinationFields)) {
final Object replacement = getReplacementObject(selectedFields);
if (replacement == null) {
return record;
}
+
if (replacement instanceof Record) {
return (Record) replacement;
}
@@ -262,6 +270,11 @@ public class UpdateRecord extends AbstractRecordProcessor {
}
}
+ private List<FieldValue> getSelectedFields(final RecordPath replacementRecordPath, final FieldValue fieldValue, final Record record) {
+ final RecordPathResult replacementResult = replacementRecordPath.evaluate(record, fieldValue);
+ return replacementResult.getSelectedFields().collect(Collectors.toList());
+ }
+
private Object getReplacementObject(final List<FieldValue> selectedFields) {
if (selectedFields.size() > 1) {
final List<RecordField> fields = selectedFields.stream().map(FieldValue::getField).collect(Collectors.toList());
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
index 9893833e14..6dc5f29152 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
@@ -438,6 +438,65 @@ public class TestUpdateRecord {
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
}
+ @Test
+ public void testSetRootWithUnescapeJsonCall() throws InitializationException, IOException {
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("reader", jsonReader);
+
+ final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc")));
+ final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc")));
+
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("writer", jsonWriter);
+ runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+ runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+ runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
+ runner.enableControllerService(jsonWriter);
+
+ runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person-stringified-name.json"));
+ runner.setProperty("/", "unescapeJson(/stringified_name, 'true')");
+ runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
+ final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-fields-only.json")));
+ runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
+ }
+
+ @Test
+ public void testSetFieldWithUnescapeJsonCall() throws InitializationException, IOException {
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("reader", jsonReader);
+
+ final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc")));
+ final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc")));
+
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("writer", jsonWriter);
+ runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+ runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+ runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
+ runner.enableControllerService(jsonWriter);
+
+ runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person-stringified-name.json"));
+ runner.setProperty("/name", "unescapeJson(/stringified_name)");
+ runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
+ final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-name.json")));
+ runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
+ }
@Test
public void testSetRootPathRelativeWithMultipleValues() throws InitializationException, IOException {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person-stringified-name.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person-stringified-name.json
new file mode 100644
index 0000000000..da4f643953
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person-stringified-name.json
@@ -0,0 +1,4 @@
+{
+ "id": 485,
+ "stringified_name": "{\"last\": \"Doe\", \"first\": \"John\"}"
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-name.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-name.json
new file mode 100644
index 0000000000..e153afedf6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-name.json
@@ -0,0 +1,7 @@
+[ {
+ "id" : 485,
+ "name" : {
+ "last" : "Doe",
+ "first" : "John"
+ }
+} ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc
new file mode 100644
index 0000000000..d8f2bf25b4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc
@@ -0,0 +1,9 @@
+{
+ "name": "personWithNameRecord",
+ "namespace": "nifi",
+ "type": "record",
+ "fields": [
+ { "name": "id", "type": "int" },
+ { "name": "stringified_name", "type": "string" }
+ ]
+}
\ No newline at end of file