You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/06/29 15:28:24 UTC

[nifi] branch main updated: NIFI-10169: When using the Insert Record Fields join strategy of JoinEnrichment, ensure that in order to combine schemas from the original record and the enrichment record we use incorporateSchema() so that even when the first enrichment record is null, we get the schema correct

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 39c366eeef NIFI-10169: When using the Insert Record Fields join strategy of JoinEnrichment, ensure that in order to combine schemas from the original record and the enrichment record we use incorporateSchema() so that even when the first enrichment record is null, we get the schema correct
39c366eeef is described below

commit 39c366eeef6a8c5be496ccab9c395a4fe94be9c3
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Sat Jun 25 11:52:38 2022 -0400

    NIFI-10169: When using the Insert Record Fields join strategy of JoinEnrichment, ensure that in order to combine schemas from the original record and the enrichment record we use incorporateSchema() so that even when the first enrichment record is null, we get the schema correct
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #6157.
---
 .../nifi-standard-processors/pom.xml               |  1 +
 .../enrichment/InsertRecordFieldsJoinStrategy.java | 23 ++++++++++-
 .../processors/standard/TestJoinEnrichment.java    | 45 +++++++++++++++++++++-
 .../insert-enrichment-first-value-null.json        | 10 +++++
 4 files changed, 76 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 07acc867d0..7d5c95c037 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -668,6 +668,7 @@
                         <exclude>src/test/resources/TestValidateRecord/int-maps-data.json</exclude>
                         <exclude>src/test/resources/TestValidateRecord/array-and-map-with-null-element.avro</exclude>
                         <exclude>src/test/resources/TestJoinEnrichment/insert-enrichment.json</exclude>
+                        <exclude>src/test/resources/TestJoinEnrichment/insert-enrichment-first-value-null.json</exclude>
                         <exclude>src/test/resources/TestJoinEnrichment/insert-original.json</exclude>
                         <exclude>src/test/resources/TestJoinEnrichment/left-outer-join-enrichment.csv</exclude>
                         <exclude>src/test/resources/TestJoinEnrichment/left-outer-join-expected.csv</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/InsertRecordFieldsJoinStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/InsertRecordFieldsJoinStrategy.java
index 7f09bf928e..8051c492af 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/InsertRecordFieldsJoinStrategy.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/InsertRecordFieldsJoinStrategy.java
@@ -39,6 +39,21 @@ public class InsertRecordFieldsJoinStrategy extends IndexCorrelatedJoinStrategy
 
     @Override
     protected Record combineRecords(final Record originalRecord, final Record enrichmentRecord, final RecordSchema resultSchema) {
+        // We only need to incorporate the enrichment record's schema when determining the result schema. After that,
+        // we will use the result schema for writing, not the Record's schema. So we can ignore the expense of incorporating
+        // the fields.
+        return combineRecords(originalRecord, enrichmentRecord, false);
+    }
+
+    /**
+     * Creates a single record that combines both the original record and the enrichment record
+     * @param originalRecord the original record
+     * @param enrichmentRecord the enrichment record
+     * @param incorporateEnrichmentSchema whether or not to update the originalRecord's schema to include the fields of the enrichmentRecord. Doing so can be
+     * expensive and is not necessary if the Record's schema will not be used.
+     * @return the combined record
+     */
+    private Record combineRecords(final Record originalRecord, final Record enrichmentRecord, final boolean incorporateEnrichmentSchema) {
         if (originalRecord == null) {
             return null;
         }
@@ -57,17 +72,21 @@ public class InsertRecordFieldsJoinStrategy extends IndexCorrelatedJoinStrategy
             }
 
             final Record parentRecord = (Record) value;
-            enrichmentRecord.toMap().forEach(parentRecord::setValue);
+            if (incorporateEnrichmentSchema) {
+                parentRecord.incorporateSchema(enrichmentRecord.getSchema());
+            }
 
+            enrichmentRecord.toMap().forEach(parentRecord::setValue);
             parentRecord.incorporateInactiveFields();
         }
 
         return originalRecord;
     }
 
+
     @Override
     protected RecordSchema createResultSchema(final Record firstOriginalRecord, final Record firstEnrichmentRecord) {
-        final Record combined = combineRecords(firstOriginalRecord, firstEnrichmentRecord, firstOriginalRecord.getSchema());
+        final Record combined = combineRecords(firstOriginalRecord, firstEnrichmentRecord, true);
         combined.incorporateInactiveFields();
         return combined.getSchema();
     }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java
index 1a30332dce..70568ec733 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java
@@ -49,6 +49,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 public class TestJoinEnrichment {
     private static final File EXAMPLES_DIR = new File("src/test/resources/TestJoinEnrichment");
@@ -149,7 +150,7 @@ public class TestJoinEnrichment {
 
     // Tests that the Insert Enrichment Record Fields example in the Additional Details produces expected output
     @Test
-    public void testInsertEnrichmentFields() throws InitializationException, IOException, SchemaNotFoundException, MalformedRecordException {
+    public void testInsertEnrichmentFields() throws InitializationException, IOException {
         final TestRunner runner = TestRunners.newTestRunner(new JoinEnrichment());
 
         final ArrayListRecordWriter writer = setupJsonServices(runner);
@@ -191,6 +192,48 @@ public class TestJoinEnrichment {
         assertEquals("jane.doe@nifi.apache.org", secondCustomerDetails.getValue("email"));
     }
 
+    // Tests that when the first enrichment record has a null value, that we still properly apply subsequent enrichments.
+    @Test
+    public void testFirstEnrichmentRecordNull() throws InitializationException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new JoinEnrichment());
+
+        final ArrayListRecordWriter writer = setupJsonServices(runner);
+        runner.setProperty(JoinEnrichment.JOIN_STRATEGY, JoinEnrichment.JOIN_INSERT_ENRICHMENT_FIELDS);
+        runner.setProperty(JoinEnrichment.INSERTION_RECORD_PATH, "/purchase/customer");
+
+        final Map<String, String> originalAttributes = new HashMap<>();
+        originalAttributes.put("enrichment.group.id", "abc");
+        originalAttributes.put("enrichment.role", "ORIGINAL");
+        runner.enqueue(new File(EXAMPLES_DIR, "insert-original.json").toPath(), originalAttributes);
+
+        final Map<String, String> enrichmentAttributes = new HashMap<>();
+        enrichmentAttributes.put("enrichment.group.id", "abc");
+        enrichmentAttributes.put("enrichment.role", "ENRICHMENT");
+        runner.enqueue(new File(EXAMPLES_DIR, "insert-enrichment-first-value-null.json").toPath(), enrichmentAttributes);
+
+        runner.run();
+
+        runner.assertTransferCount(JoinEnrichment.REL_JOINED, 1);
+        runner.assertTransferCount(JoinEnrichment.REL_ORIGINAL, 2);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(2, written.size());
+
+        final RecordPath recordPath = RecordPath.compile("/purchase/customer/customerDetails");
+
+        final List<Object> firstCustomerDetailsList = recordPath.evaluate(written.get(0)).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
+        assertEquals(1, firstCustomerDetailsList.size());
+        final Record customerDetails = (Record) firstCustomerDetailsList.get(0);
+        assertNull(customerDetails);
+
+        final List<Object> secondCustomerDetailsList = recordPath.evaluate(written.get(1)).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
+        assertEquals(1, secondCustomerDetailsList.size());
+        final Record secondCustomerDetails = (Record) secondCustomerDetailsList.get(0);
+        assertEquals(5512, secondCustomerDetails.getValue("id"));
+        assertEquals("555-555-5511", secondCustomerDetails.getValue("phone"));
+        assertEquals("jane.doe@nifi.apache.org", secondCustomerDetails.getValue("email"));
+    }
+
 
     private List<Record> readCsvRecords(final File file) throws IOException, SchemaNotFoundException, MalformedRecordException {
         final CommaSeparatedRecordReader reader = new CommaSeparatedRecordReader();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoinEnrichment/insert-enrichment-first-value-null.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoinEnrichment/insert-enrichment-first-value-null.json
new file mode 100644
index 0000000000..9631f11d2c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestJoinEnrichment/insert-enrichment-first-value-null.json
@@ -0,0 +1,10 @@
+[
+  {
+    "customerDetails": null
+  }, {
+  "customerDetails": {
+    "id": 5512,
+    "phone": "555-555-5511",
+    "email": "jane.doe@nifi.apache.org"
+  }
+}]
\ No newline at end of file