You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/07/10 16:57:23 UTC

nifi git commit: NIFI-5382 ConvertRecord Add Option To Drop 0 Record Files

Repository: nifi
Updated Branches:
  refs/heads/master e94f0757d -> 624bbab8f


NIFI-5382 ConvertRecord Add Option To Drop 0 Record Files

NIFI-5382 ConvertRecord Add Option To Drop 0 Record Files

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2853


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/624bbab8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/624bbab8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/624bbab8

Branch: refs/heads/master
Commit: 624bbab8f06cf8e49c6229d8a2473a429548fcac
Parents: e94f075
Author: patricker <pa...@gmail.com>
Authored: Fri Jul 6 14:16:11 2018 -0600
Committer: Matthew Burgess <ma...@apache.org>
Committed: Tue Jul 10 12:56:53 2018 -0400

----------------------------------------------------------------------
 .../standard/AbstractRecordProcessor.java       | 19 +++++++++-
 .../nifi/processors/standard/ConvertRecord.java | 11 ++++++
 .../processors/standard/TestConvertRecord.java  | 38 ++++++++++++++++++++
 3 files changed, 67 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/624bbab8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
index 6f777ea..9dbe8c1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -64,6 +65,17 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
         .required(true)
         .build();
 
+    static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
+            .name("include-zero-record-flowfiles")
+            .displayName("Include Zero Record FlowFiles")
+            .description("When converting an incoming FlowFile, if the conversion results in no data, "
+                    + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
         .description("FlowFiles that are successfully transformed will be routed to this relationship")
@@ -99,6 +111,7 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
 
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean includeZeroRecordFlowFiles = context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).isSet()? context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean():true;
 
         final Map<String, String> attributes = new HashMap<>();
         final AtomicInteger recordCount = new AtomicInteger();
@@ -142,7 +155,11 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
         }
 
         flowFile = session.putAllAttributes(flowFile, attributes);
-        session.transfer(flowFile, REL_SUCCESS);
+        if(!includeZeroRecordFlowFiles && recordCount.get() == 0){
+            session.remove(flowFile);
+        } else {
+            session.transfer(flowFile, REL_SUCCESS);
+        }
 
         final int count = recordCount.get();
         session.adjustCounter("Records Processed", count, false);

http://git-wip-us.apache.org/repos/asf/nifi/blob/624bbab8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
index f87339d..a2d4e69 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
@@ -26,11 +26,15 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.util.ArrayList;
+import java.util.List;
+
 @EventDriven
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
@@ -49,6 +53,13 @@ import org.apache.nifi.serialization.record.RecordSchema;
 public class ConvertRecord extends AbstractRecordProcessor {
 
     @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(INCLUDE_ZERO_RECORD_FLOWFILES);
+        return properties;
+    }
+
+    @Override
     protected Record process(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context) {
         return record;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/624bbab8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
index de3a428..7d3f316 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -62,6 +62,44 @@ public class TestConvertRecord {
         out.assertContentEquals("header\nJohn Doe,48\nJane Doe,47\nJimmy Doe,14\n");
     }
 
+    @Test
+    public void testDropEmpty() throws InitializationException {
+        final MockRecordParser readerService = new MockRecordParser();
+        final MockRecordWriter writerService = new MockRecordWriter("header", false);
+
+        final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ConvertRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "false");
+        runner.setProperty(ConvertRecord.RECORD_READER, "reader");
+        runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
+
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 0);
+        runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 0);
+
+        readerService.addRecord("John Doe", 48);
+        readerService.addRecord("Jane Doe", 47);
+        readerService.addRecord("Jimmy Doe", 14);
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
+
+        out.assertAttributeEquals("record.count", "3");
+        out.assertAttributeEquals("mime.type", "text/plain");
+        out.assertContentEquals("header\nJohn Doe,48\nJane Doe,47\nJimmy Doe,14\n");
+    }
 
     @Test
     public void testReadFailure() throws InitializationException {