You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2022/06/27 19:53:57 UTC
[nifi] branch main updated: NIFI-5005: Fix MergeRecord to honor writer's schema
This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9d379121c4 NIFI-5005: Fix MergeRecord to honor writer's schema
9d379121c4 is described below
commit 9d379121c42f2502ad56d1e1e9b7324847f89718
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Tue Jun 21 11:24:16 2022 -0400
NIFI-5005: Fix MergeRecord to honor writer's schema
This closes #6141.
Signed-off-by: Tamas Palfy <tp...@apache.org>
---
.../serialization/record/MockRecordWriter.java | 40 +++++++++++++++++-----
.../nifi/processors/standard/merge/RecordBin.java | 4 ++-
.../nifi/processors/standard/TestForkRecord.java | 10 +++---
.../nifi/processors/standard/TestMergeRecord.java | 33 ++++++++++++++++++
4 files changed, 72 insertions(+), 15 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index fe7bde523f..1398922919 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
@@ -38,12 +39,14 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
private final boolean quoteValues;
private final boolean bufferOutput;
+ private final RecordSchema writeSchema;
+
public MockRecordWriter() {
this(null);
}
public MockRecordWriter(final String header) {
- this(header, true, -1, false);
+ this(header, true, -1, false, null);
}
public MockRecordWriter(final String header, final boolean quoteValues) {
@@ -51,23 +54,24 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) {
- this(header, quoteValues, failAfterN, false);
+ this(header, quoteValues, failAfterN, false, null);
}
public MockRecordWriter(final String header, final boolean quoteValues, final boolean bufferOutput) {
- this(header, quoteValues, -1, bufferOutput);
+ this(header, quoteValues, -1, bufferOutput, null);
}
- public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN, final boolean bufferOutput) {
+ public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN, final boolean bufferOutput, final RecordSchema writeSchema) {
this.header = header;
this.quoteValues = quoteValues;
this.failAfterN = failAfterN;
this.bufferOutput = bufferOutput;
+ this.writeSchema = writeSchema;
}
@Override
public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
- return new SimpleRecordSchema(Collections.emptyList());
+ return (writeSchema != null) ? writeSchema : new SimpleRecordSchema(Collections.emptyList());
}
@Override
@@ -78,6 +82,8 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
private int recordCount = 0;
private boolean headerWritten = false;
+ private RecordSchema writerSchema = schema;
+
@Override
public void flush() throws IOException {
out.flush();
@@ -98,10 +104,18 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written");
}
- final int numCols = record.getSchema().getFieldCount();
+ final int numCols;
+ final List<String> fieldNames;
+ if (this.writerSchema != null && this.writerSchema.getFieldCount() != 0) {
+ fieldNames = this.writerSchema.getFieldNames();
+ numCols = this.writerSchema.getFieldCount();
+ } else {
+ fieldNames = record.getSchema().getFieldNames();
+ numCols = record.getSchema().getFieldCount();
+ }
int i = 0;
- for (final String fieldName : record.getSchema().getFieldNames()) {
+ for (final String fieldName : fieldNames) {
final String val = record.getAsString(fieldName);
if (val != null) {
if (quoteValues) {
@@ -140,9 +154,17 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
headerWritten = true;
}
- final int numCols = record.getSchema().getFieldCount();
+ final int numCols;
+ final List<String> fieldNames;
+ if (this.writerSchema != null && this.writerSchema.getFieldCount() != 0) {
+ fieldNames = this.writerSchema.getFieldNames();
+ numCols = this.writerSchema.getFieldCount();
+ } else {
+ fieldNames = record.getSchema().getFieldNames();
+ numCols = record.getSchema().getFieldCount();
+ }
int i = 0;
- for (final String fieldName : record.getSchema().getFieldNames()) {
+ for (final String fieldName : fieldNames) {
final String val = record.getAsString(fieldName);
if (val != null) {
if (quoteValues) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index 7dfffb7e60..36e1f2a1de 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -28,6 +28,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import java.io.IOException;
@@ -130,7 +131,8 @@ public class RecordBin {
this.out = new ByteCountingOutputStream(rawOut);
- recordWriter = writerFactory.createWriter(logger, recordReader.getSchema(), out, flowFile);
+ RecordSchema outputSchema = writerFactory.getSchema(flowFile.getAttributes(), recordReader.getSchema());
+ recordWriter = writerFactory.createWriter(logger, outputSchema, out, flowFile);
recordWriter.beginRecordSet();
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
index 05bace42af..b61bb0ced0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java
@@ -172,7 +172,7 @@ public class TestForkRecord {
final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
mff.assertAttributeEquals("record.count", "2");
- mff.assertContentEquals("header\n42,4750.89,John Doe,123 My Street,My City,MS,11111,USA\n43,48212.38,John Doe,123 My Street,My City,MS,11111,USA\n");
+ mff.assertContentEquals("header\n42,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n43,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n");
}
@Test
@@ -288,8 +288,8 @@ public class TestForkRecord {
final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
mff.assertAttributeEquals("record.count", "4");
- mff.assertContentEquals("header\n5,150.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n6,-15.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n"
- + "7,36.78,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n8,-21.34,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n");
+ mff.assertContentEquals("header\n5,John Doe,123 My Street,My City,MS,11111,USA,4750.89,150.31\n6,John Doe,123 My Street,My City,MS,11111,USA,4750.89,-15.31\n"
+ + "7,John Doe,123 My Street,My City,MS,11111,USA,48212.38,36.78\n8,John Doe,123 My Street,My City,MS,11111,USA,48212.38,-21.34\n");
}
@Test
@@ -369,8 +369,8 @@ public class TestForkRecord {
final MockFlowFile mff = runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
mff.assertAttributeEquals("record.count", "4");
- mff.assertContentEquals("header\n5,150.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n6,-15.31,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n"
- + "7,36.78,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n8,-21.34,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n");
+ mff.assertContentEquals("header\n5,John Doe,123 My Street,My City,MS,11111,USA,4750.89,150.31\n6,John Doe,123 My Street,My City,MS,11111,USA,4750.89,-15.31\n"
+ + "7,John Doe,123 My Street,My City,MS,11111,USA,48212.38,36.78\n8,John Doe,123 My Street,My City,MS,11111,USA,48212.38,-21.34\n");
}
@Test
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 27496672f7..e330577516 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -19,8 +19,12 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.CommaSeparatedRecordReader;
import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -28,6 +32,7 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -93,6 +98,34 @@ public class TestMergeRecord {
ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
}
+ @Test
+ public void testMergeSimpleDifferentWriteSchema() throws InitializationException {
+ // Exclude Age field
+ List<RecordField> writeFields = Collections.singletonList(
+ new RecordField("Name", RecordFieldType.STRING.getDataType())
+ );
+ RecordSchema writeSchema = new SimpleRecordSchema(writeFields);
+ writerService = new MockRecordWriter("header", false, -1, true, writeSchema);
+ runner.addControllerService("differentWriter", writerService);
+ runner.enableControllerService(writerService);
+
+ runner.setProperty(MergeRecord.RECORD_READER, "reader");
+ runner.setProperty(MergeRecord.RECORD_WRITER, "differentWriter");
+ runner.enqueue("Name, Age\nJohn, 35");
+ runner.enqueue("Name, Age\nJane, 34");
+
+ runner.run(2);
+ runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+ runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
+
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
+ mff.assertAttributeEquals("record.count", "2");
+ mff.assertContentEquals("header\nJohn\nJane\n");
+
+ runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(
+ ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
+ }
+
// Verify that FlowFiles are grouped with like schemas.
@Test