You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/07/10 16:57:23 UTC
nifi git commit: NIFI-5382 ConvertRecord Add Option To Drop 0 Record
Files
Repository: nifi
Updated Branches:
refs/heads/master e94f0757d -> 624bbab8f
NIFI-5382 ConvertRecord Add Option To Drop 0 Record Files
NIFI-5382 ConvertRecord Add Option To Drop 0 Record Files
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2853
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/624bbab8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/624bbab8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/624bbab8
Branch: refs/heads/master
Commit: 624bbab8f06cf8e49c6229d8a2473a429548fcac
Parents: e94f075
Author: patricker <pa...@gmail.com>
Authored: Fri Jul 6 14:16:11 2018 -0600
Committer: Matthew Burgess <ma...@apache.org>
Committed: Tue Jul 10 12:56:53 2018 -0400
----------------------------------------------------------------------
.../standard/AbstractRecordProcessor.java | 19 +++++++++-
.../nifi/processors/standard/ConvertRecord.java | 11 ++++++
.../processors/standard/TestConvertRecord.java | 38 ++++++++++++++++++++
3 files changed, 67 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/624bbab8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
index 6f777ea..9dbe8c1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
@@ -18,6 +18,7 @@
package org.apache.nifi.processors.standard;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
@@ -64,6 +65,17 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
.required(true)
.build();
+ static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
+ .name("include-zero-record-flowfiles")
+ .displayName("Include Zero Record FlowFiles")
+ .description("When converting an incoming FlowFile, if the conversion results in no data, "
+ + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are successfully transformed will be routed to this relationship")
@@ -99,6 +111,7 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final boolean includeZeroRecordFlowFiles = context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).isSet()? context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean():true;
final Map<String, String> attributes = new HashMap<>();
final AtomicInteger recordCount = new AtomicInteger();
@@ -142,7 +155,11 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
}
flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_SUCCESS);
+ if(!includeZeroRecordFlowFiles && recordCount.get() == 0){
+ session.remove(flowFile);
+ } else {
+ session.transfer(flowFile, REL_SUCCESS);
+ }
final int count = recordCount.get();
session.adjustCounter("Records Processed", count, false);
http://git-wip-us.apache.org/repos/asf/nifi/blob/624bbab8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
index f87339d..a2d4e69 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
@@ -26,11 +26,15 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
+import java.util.ArrayList;
+import java.util.List;
+
@EventDriven
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -49,6 +53,13 @@ import org.apache.nifi.serialization.record.RecordSchema;
public class ConvertRecord extends AbstractRecordProcessor {
@Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ properties.add(INCLUDE_ZERO_RECORD_FLOWFILES);
+ return properties;
+ }
+
+ @Override
protected Record process(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context) {
return record;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/624bbab8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
index de3a428..7d3f316 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -62,6 +62,44 @@ public class TestConvertRecord {
out.assertContentEquals("header\nJohn Doe,48\nJane Doe,47\nJimmy Doe,14\n");
}
+ @Test
+ public void testDropEmpty() throws InitializationException {
+ final MockRecordParser readerService = new MockRecordParser();
+ final MockRecordWriter writerService = new MockRecordWriter("header", false);
+
+ final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
+ runner.addControllerService("reader", readerService);
+ runner.enableControllerService(readerService);
+ runner.addControllerService("writer", writerService);
+ runner.enableControllerService(writerService);
+
+ runner.setProperty(ConvertRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "false");
+ runner.setProperty(ConvertRecord.RECORD_READER, "reader");
+ runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
+
+ readerService.addSchemaField("name", RecordFieldType.STRING);
+ readerService.addSchemaField("age", RecordFieldType.INT);
+
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 0);
+ runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 0);
+
+ readerService.addRecord("John Doe", 48);
+ readerService.addRecord("Jane Doe", 47);
+ readerService.addRecord("Jimmy Doe", 14);
+
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
+
+ out.assertAttributeEquals("record.count", "3");
+ out.assertAttributeEquals("mime.type", "text/plain");
+ out.assertContentEquals("header\nJohn Doe,48\nJane Doe,47\nJimmy Doe,14\n");
+ }
@Test
public void testReadFailure() throws InitializationException {