You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2022/06/27 19:53:57 UTC

[nifi] branch main updated: NIFI-5005: Fix MergeRecord to honor writer's schema

This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d379121c4 NIFI-5005: Fix MergeRecord to honor writer's schema
9d379121c4 is described below

commit 9d379121c42f2502ad56d1e1e9b7324847f89718
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Tue Jun 21 11:24:16 2022 -0400

    NIFI-5005: Fix MergeRecord to honor writer's schema
    
    This closes #6141.
    
    Signed-off-by: Tamas Palfy <tp...@apache.org>
---
 .../serialization/record/MockRecordWriter.java     | 40 +++++++++++++++++-----
 .../nifi/processors/standard/merge/RecordBin.java  |  4 ++-
 .../nifi/processors/standard/TestForkRecord.java   | 10 +++---
 .../nifi/processors/standard/TestMergeRecord.java  | 33 ++++++++++++++++++
 4 files changed, 72 insertions(+), 15 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index fe7bde523f..1398922919 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
@@ -38,12 +39,14 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
     private final boolean quoteValues;
     private final boolean bufferOutput;
 
+    private final RecordSchema writeSchema;
+
     public MockRecordWriter() {
         this(null);
     }
 
     public MockRecordWriter(final String header) {
-        this(header, true, -1, false);
+        this(header, true, -1, false, null);
     }
 
     public MockRecordWriter(final String header, final boolean quoteValues) {
@@ -51,23 +54,24 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
     }
 
     public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) {
-        this(header, quoteValues, failAfterN, false);
+        this(header, quoteValues, failAfterN, false, null);
     }
 
     public MockRecordWriter(final String header, final boolean quoteValues, final boolean bufferOutput) {
-        this(header, quoteValues, -1, bufferOutput);
+        this(header, quoteValues, -1, bufferOutput, null);
     }
 
-    public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN, final boolean bufferOutput) {
+    public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN, final boolean bufferOutput, final RecordSchema writeSchema) {
         this.header = header;
         this.quoteValues = quoteValues;
         this.failAfterN = failAfterN;
         this.bufferOutput = bufferOutput;
+        this.writeSchema = writeSchema;
     }
 
     @Override
     public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
-        return new SimpleRecordSchema(Collections.emptyList());
+        return (writeSchema != null) ? writeSchema : new SimpleRecordSchema(Collections.emptyList());
     }
 
     @Override
@@ -78,6 +82,8 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
             private int recordCount = 0;
             private boolean headerWritten = false;
 
+            private RecordSchema writerSchema = schema;
+
             @Override
             public void flush() throws IOException {
                 out.flush();
@@ -98,10 +104,18 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
                         throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written");
                     }
 
-                    final int numCols = record.getSchema().getFieldCount();
+                    final int numCols;
+                    final List<String> fieldNames;
+                    if (this.writerSchema != null && this.writerSchema.getFieldCount() != 0) {
+                        fieldNames = this.writerSchema.getFieldNames();
+                        numCols = this.writerSchema.getFieldCount();
+                    } else {
+                        fieldNames = record.getSchema().getFieldNames();
+                        numCols = record.getSchema().getFieldCount();
+                    }
 
                     int i = 0;
-                    for (final String fieldName : record.getSchema().getFieldNames()) {
+                    for (final String fieldName : fieldNames) {
                         final String val = record.getAsString(fieldName);
                         if (val != null) {
                             if (quoteValues) {
@@ -140,9 +154,17 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
                     headerWritten = true;
                 }
 
-                final int numCols = record.getSchema().getFieldCount();
+                final int numCols;
+                final List<String> fieldNames;
+                if (this.writerSchema != null && this.writerSchema.getFieldCount() != 0) {
+                    fieldNames = this.writerSchema.getFieldNames();
+                    numCols = this.writerSchema.getFieldCount();
+                } else {
+                    fieldNames = record.getSchema().getFieldNames();
+                    numCols = record.getSchema().getFieldCount();
+                }
                 int i = 0;
-                for (final String fieldName : record.getSchema().getFieldNames()) {
+                for (final String fieldName : fieldNames) {
                     final String val = record.getAsString(fieldName);
                     if (val != null) {
                         if (quoteValues) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index 7dfffb7e60..36e1f2a1de 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -28,6 +28,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 
 import java.io.IOException;
@@ -130,7 +131,8 @@ public class RecordBin {
 
                 this.out = new ByteCountingOutputStream(rawOut);
 
-                recordWriter = writerFactory.createWriter(logger, recordReader.getSchema(), out, flowFile);
+                RecordSchema outputSchema = writerFactory.getSchema(flowFile.getAttributes(), recordReader.getSchema());
+                recordWriter = writerFactory.createWriter(logger, outputSchema, out, flowFile);
                 recordWriter.beginRecordSet();
             }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
index 05bace42af..b61bb0ced0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
@@ -172,7 +172,7 @@ public class TestForkRecord {
 
         final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
         mff.assertAttributeEquals("record.count", "2");
-        mff.assertContentEquals("header\n42,4750.89,John Doe,123 My Street,My City,MS,11111,USA\n43,48212.38,John Doe,123 My Street,My City,MS,11111,USA\n");
+        mff.assertContentEquals("header\n42,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n43,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n");
     }
 
     @Test
@@ -288,8 +288,8 @@ public class TestForkRecord {
 
         final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
         mff.assertAttributeEquals("record.count", "4");
-        mff.assertContentEquals("header\n5,150.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n6,-15.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n"
-                + "7,36.78,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n8,-21.34,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n");
+        mff.assertContentEquals("header\n5,John Doe,123 My Street,My City,MS,11111,USA,4750.89,150.31\n6,John Doe,123 My Street,My City,MS,11111,USA,4750.89,-15.31\n"
+                + "7,John Doe,123 My Street,My City,MS,11111,USA,48212.38,36.78\n8,John Doe,123 My Street,My City,MS,11111,USA,48212.38,-21.34\n");
     }
 
     @Test
@@ -369,8 +369,8 @@ public class TestForkRecord {
 
         final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
         mff.assertAttributeEquals("record.count", "4");
-        mff.assertContentEquals("header\n5,150.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n6,-15.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n"
-                + "7,36.78,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n8,-21.34,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n");
+        mff.assertContentEquals("header\n5,John Doe,123 My Street,My City,MS,11111,USA,4750.89,150.31\n6,John Doe,123 My Street,My City,MS,11111,USA,4750.89,-15.31\n"
+                + "7,John Doe,123 My Street,My City,MS,11111,USA,48212.38,36.78\n8,John Doe,123 My Street,My City,MS,11111,USA,48212.38,-21.34\n");
     }
 
     @Test
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 27496672f7..e330577516 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -19,8 +19,12 @@ package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.CommaSeparatedRecordReader;
 import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -28,6 +32,7 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -93,6 +98,34 @@ public class TestMergeRecord {
                 ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
     }
 
+    @Test
+    public void testMergeSimpleDifferentWriteSchema() throws InitializationException {
+        // Exclude Age field
+        List<RecordField> writeFields = Collections.singletonList(
+          new RecordField("Name", RecordFieldType.STRING.getDataType())
+        );
+        RecordSchema writeSchema = new SimpleRecordSchema(writeFields);
+        writerService = new MockRecordWriter("header", false, -1, true, writeSchema);
+        runner.addControllerService("differentWriter", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(MergeRecord.RECORD_READER, "reader");
+        runner.setProperty(MergeRecord.RECORD_WRITER, "differentWriter");
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Age\nJane, 34");
+
+        runner.run(2);
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
+
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
+        mff.assertAttributeEquals("record.count", "2");
+        mff.assertContentEquals("header\nJohn\nJane\n");
+
+        runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(
+                ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
+    }
+
 
     // Verify that FlowFiles are grouped with like schemas.
     @Test