You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/02/15 17:00:09 UTC

[nifi] branch master updated: NIFI-6033, NIFI-6034, NIFI-6035, NIFI-6036, NIFI-6037: Fixed bugs that were found during 1.9.0-RC1 validation. If multiple FlowFiles were written to same Content Claim, and a Processor attempted to read two of them wi within a single session, it would seek to the wrong part of the content or else throw a ContentNotFoundException. Updated logic for considering a processor to be 'running' / having 'active threads' if the processor is invalid upon NiFi restart but scheduled to run. Fixed NPE [...]

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new b508d6b  NIFI-6033, NIFI-6034, NIFI-6035, NIFI-6036, NIFI-6037: Fixed bugs that were found during 1.9.0-RC1 validation. If multiple FlowFiles were written to same Content Claim, and a Processor attempted to read two of them wi within a single session, it would seek to the wrong part of the content or else throw a ContentNotFoundException. Updated logic for considering a processor to be 'running' / having 'active threads' if the processor is invalid upon NiFi restart but scheduled [...]
b508d6b is described below

commit b508d6bfbc2b6a68fe2ab152de96611f06a8724a
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Feb 14 15:11:59 2019 -0500

    NIFI-6033, NIFI-6034, NIFI-6035, NIFI-6036, NIFI-6037: Fixed bugs that were found during 1.9.0-RC1 validation. If multiple FlowFiles were written to same Content Claim, and a Processor attempted to read two of them wi within a single session, it would seek to the wrong part of the content or else throw a ContentNotFoundException. Updated logic for considering a processor to be 'running' / having 'active threads' if the processor is invalid upon NiFi restart but scheduled to run. Fixed [...]
    
    This closes #3309.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../serialization/record/MockRecordWriter.java     | 29 ++++++++----
 .../nifi/controller/AbstractComponentNode.java     | 17 ++++---
 .../nifi/controller/StandardProcessorNode.java     |  5 +-
 .../repository/StandardProcessSession.java         |  4 +-
 .../queue/clustered/LoadBalancedQueueIT.java       |  4 +-
 .../scheduling/ProcessorLifecycleIT.java           | 50 --------------------
 .../nifi/processors/standard/MergeRecord.java      |  6 +--
 .../nifi/processors/standard/merge/RecordBin.java  | 54 ++++++++++++++--------
 .../standard/merge/RecordBinManager.java           |  9 +---
 .../nifi/processors/standard/TestMergeRecord.java  | 23 ++++++++-
 .../SchemaRegistryRecordSetWriter.java             |  6 ++-
 11 files changed, 101 insertions(+), 106 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index d7579e8..9d6b087 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -17,11 +17,6 @@
 
 package org.apache.nifi.serialization.record;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Map;
-
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -30,27 +25,43 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+
 public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
     private final String header;
     private final int failAfterN;
     private final boolean quoteValues;
+    private final boolean bufferOutput;
 
     public MockRecordWriter() {
         this(null);
     }
 
     public MockRecordWriter(final String header) {
-        this(header, true, -1);
+        this(header, true, -1, false);
     }
 
     public MockRecordWriter(final String header, final boolean quoteValues) {
-        this(header, quoteValues, -1);
+        this(header, quoteValues, false);
     }
 
     public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) {
+        this(header, quoteValues, failAfterN, false);
+    }
+
+    public MockRecordWriter(final String header, final boolean quoteValues, final boolean bufferOutput) {
+        this(header, quoteValues, -1, bufferOutput);
+    }
+
+    public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN, final boolean bufferOutput) {
         this.header = header;
         this.quoteValues = quoteValues;
         this.failAfterN = failAfterN;
+        this.bufferOutput = bufferOutput;
     }
 
     @Override
@@ -59,7 +70,9 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
+    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream rawOut) {
+        final OutputStream out = bufferOutput ? new BufferedOutputStream(rawOut) : rawOut;
+
         return new RecordSetWriter() {
             private int recordCount = 0;
             private boolean headerWritten = false;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 1f03923..f78c9d2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -204,8 +204,12 @@ public abstract class AbstractComponentNode implements ComponentNode {
                 }
             }
 
-            logger.debug("Resetting Validation State of {} due to setting properties", this);
-            resetValidationState();
+            if (isTriggerValidation()) {
+                logger.debug("Resetting Validation State of {} due to setting properties", this);
+                resetValidationState();
+            } else {
+                logger.debug("Properties set for {} but not resettingn validation state because validation is paused", this);
+            }
         } finally {
             lock.unlock();
         }
@@ -642,13 +646,8 @@ public abstract class AbstractComponentNode implements ComponentNode {
     public void resumeValidationTrigger() {
         triggerValidation = true;
 
-        final ValidationStatus validationStatus = getValidationStatus();
-        if (validationStatus == ValidationStatus.VALIDATING) {
-            logger.debug("Resuming Triggering of Validation State for {}; status is VALIDATING so will trigger async validation now", this);
-            validationTrigger.triggerAsync(this);
-        } else {
-            logger.debug("Resuming Triggering of Validation State for {}; status is {} so will not trigger async validation now", this, validationStatus);
-        }
+        logger.debug("Resuming Triggering of Validation State for {}; Resetting validation state", this);
+        resetValidationState();
     }
 
     private boolean isTriggerValidation() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index e59e7fd..3e407f5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1367,7 +1367,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
         }
 
         if (starting) { // will ensure that the Processor represented by this node can only be started once
-            hasActiveThreads = true;
             initiateStart(taskScheduler, administrativeYieldMillis, timeoutMillis, processContext, schedulingAgentCallback);
         } else {
             final String procName = processorRef.get().toString();
@@ -1395,7 +1394,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
         final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
 
         final Map<Long, ThreadInfo> threadInfoMap = Stream.of(infos)
-            .collect(Collectors.toMap(info -> info.getThreadId(), Function.identity(), (a, b) -> a));
+            .collect(Collectors.toMap(ThreadInfo::getThreadId, Function.identity(), (a, b) -> a));
 
         final List<ActiveThreadInfo> threadList = new ArrayList<>(activeThreads.size());
         for (final Map.Entry<Thread, ActiveTask> entry : activeThreads.entrySet()) {
@@ -1509,6 +1508,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
 
             try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
                 try {
+                    hasActiveThreads = true;
+
                     activateThread();
                     try {
                         ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 071a052..604eb7e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2173,8 +2173,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             // callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1.
             if (allowCachingOfStream && readRecursionSet.isEmpty() && writeRecursionSet.isEmpty()) {
                 if (currentReadClaim == claim) {
-                    if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= offset) {
-                        final long bytesToSkip = offset - currentReadClaimStream.getBytesConsumed();
+                    if (currentReadClaimStream != null && currentReadClaimStream.getCurrentOffset() <= offset) {
+                        final long bytesToSkip = offset - currentReadClaimStream.getCurrentOffset();
                         if (bytesToSkip > 0) {
                             StreamUtils.skip(currentReadClaimStream, bytesToSkip);
                         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
index 615ae00..e28e178 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
@@ -307,7 +307,7 @@ public class LoadBalancedQueueIT {
         }
     }
 
-    @Test(timeout = 60_000)
+    @Test(timeout = 90_000)
     public void testFailover() throws IOException, InterruptedException {
         localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
         nodeIdentifiers.add(localNodeId);
@@ -371,7 +371,7 @@ public class LoadBalancedQueueIT {
                 final int expectedFlowFileReceiveCount = flowFilesPerNode + flowFilesPerNode / 2;
 
                 // Wait up to 10 seconds for the server's FlowFile Repository to be updated
-                final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L);
+                final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L);
                 while (serverRepoRecords.size() < expectedFlowFileReceiveCount && System.currentTimeMillis() < endTime) {
                     Thread.sleep(10L);
                 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
index 55b015d..890d799 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
@@ -485,21 +485,6 @@ public class ProcessorLifecycleIT {
         assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
     }
 
-    /**
-     * Validate that processor will not be validated on failing
-     * PropertyDescriptor validation.
-     */
-    @Test(expected = IllegalStateException.class)
-    public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception {
-        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        flowManager = fcsb.getFlowManager();
-
-        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
-        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
-                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
-        processScheduler.startProcessor(testProcNode, true);
-        fail();
-    }
 
     /**
      * Validate that processor will not be validated on failing
@@ -527,41 +512,6 @@ public class ProcessorLifecycleIT {
         fail();
     }
 
-    /**
-     * The successful processor start with ControllerService dependency.
-     */
-    @Test
-    public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception {
-        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        flowManager = fcsb.getFlowManager();
-
-        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
-
-        ControllerServiceNode testServiceNode = flowManager.createControllerService(TestService.class.getName(), "foo",
-                fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true, true);
-        testGroup.addControllerService(testServiceNode);
-
-        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
-                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
-        testGroup.addProcessor(testProcNode);
-
-        properties.put("S", testServiceNode.getIdentifier());
-        testProcNode.setProperties(properties);
-
-        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
-        testProcessor.withService = true;
-        this.noop(testProcessor);
-
-        testServiceNode.performValidation();
-        processScheduler.enableControllerService(testServiceNode);
-
-        testProcNode.performValidation();
-        processScheduler.startProcessor(testProcNode, true);
-
-        Thread.sleep(500);
-        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
-    }
-
 
     /**
      * Scenario where onTrigger() is executed with random delay limited to
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index ecebb8e..130d6b6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -314,7 +314,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
 
         final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
         final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
+        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
             block = true;
         } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
             block = true;
@@ -378,12 +378,12 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
 
     protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final RecordSchema schema, final ProcessSession session) {
         final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
+        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
             return flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
         }
 
         final Optional<String> optionalText = schema.getSchemaText();
-        final String schemaText = optionalText.isPresent() ? optionalText.get() : AvroTypeUtil.extractAvroSchema(schema).toString();
+        final String schemaText = optionalText.orElseGet(() -> AvroTypeUtil.extractAvroSchema(schema).toString());
 
         final String groupId;
         final String correlationshipAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index 23f5edf..d15ba0f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -46,8 +46,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 public class RecordBin {
-    public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
-    public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
 
     private final ComponentLog logger;
     private final ProcessSession session;
@@ -70,6 +68,8 @@ public class RecordBin {
     private static final AtomicLong idGenerator = new AtomicLong(0L);
     private final long id = idGenerator.getAndIncrement();
 
+    private volatile int requiredRecordCount = -1;
+
 
     public RecordBin(final ProcessContext context, final ProcessSession session, final ComponentLog logger, final RecordBinThresholds thresholds) {
         this.session = session;
@@ -95,7 +95,6 @@ public class RecordBin {
     }
 
     public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block) throws IOException {
-
         if (isComplete()) {
             logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
             return false;
@@ -148,6 +147,12 @@ public class RecordBin {
             flowFileMigrated = true;
             this.flowFiles.add(flowFile);
 
+            if (recordCount >= getMinimumRecordCount()) {
+                // If we have met our minimum record count, we need to flush so that when we reach the desired number of bytes
+                // the bin is considered 'full enough'.
+                recordWriter.flush();
+            }
+
             if (isFull()) {
                 logger.debug(this + " is now full. Completing bin.");
                 complete("Bin is full");
@@ -232,6 +237,29 @@ public class RecordBin {
         }
     }
 
+    private int getMinimumRecordCount() {
+        final int currentCount = requiredRecordCount;
+        if (currentCount > -1) {
+            return currentCount;
+        }
+
+        int requiredCount;
+        final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
+        if (recordCountAttribute.isPresent()) {
+            final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
+            try {
+                requiredCount = Integer.parseInt(recordCountValue);
+            } catch (final NumberFormatException e) {
+                requiredCount = 1;
+            }
+        } else {
+            requiredCount = thresholds.getMinRecords();
+        }
+
+        this.requiredRecordCount = requiredCount;
+        return requiredCount;
+    }
+
     public boolean isFullEnough() {
         readLock.lock();
         try {
@@ -239,19 +267,7 @@ public class RecordBin {
                 return false;
             }
 
-            int requiredRecordCount;
-            final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
-            if (recordCountAttribute.isPresent()) {
-                final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
-                try {
-                    requiredRecordCount = Integer.parseInt(recordCountValue);
-                } catch (final NumberFormatException e) {
-                    requiredRecordCount = 1;
-                }
-            } else {
-                requiredRecordCount = thresholds.getMinRecords();
-            }
-
+            final int requiredRecordCount = getMinimumRecordCount();
             return (recordCount >= requiredRecordCount && out.getBytesWritten() >= thresholds.getMinBytes());
         } finally {
             readLock.unlock();
@@ -386,11 +402,11 @@ public class RecordBin {
             attributes.putAll(writeResult.getAttributes());
             attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
             attributes.put(CoreAttributes.MIME_TYPE.key(), recordWriter.getMimeType());
-            attributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size()));
-            attributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge()));
+            attributes.put(MergeRecord.MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size()));
+            attributes.put(MergeRecord.MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge()));
 
             merged = session.putAllAttributes(merged, attributes);
-            flowFiles.stream().forEach(ff -> session.putAttribute(ff, "merge.uuid", merged.getAttribute(CoreAttributes.UUID.key())));
+            flowFiles.forEach(ff -> session.putAttribute(ff, MergeRecord.MERGE_UUID_ATTRIBUTE, merged.getAttribute(CoreAttributes.UUID.key())));
 
             session.getProvenanceReporter().join(flowFiles, merged, "Records Merged due to: " + completionReason);
             session.transfer(merged, MergeRecord.REL_MERGED);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index 312832e..d1dde2a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -26,8 +26,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processors.standard.MergeContent;
 import org.apache.nifi.processors.standard.MergeRecord;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 
 import java.io.IOException;
@@ -109,12 +107,9 @@ public class RecordBinManager {
      * @param block if another thread is already writing to the desired bin, passing <code>true</code> for this parameter will block until the other thread(s) have finished so
      *            that the records can still be added to the desired bin. Passing <code>false</code> will result in moving on to another bin.
      *
-     * @throws SchemaNotFoundException if unable to find the schema for the record writer
-     * @throws MalformedRecordException if unable to read a record
      * @throws IOException if there is an IO problem reading from the stream or writing to the stream
      */
-    public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block)
-        throws IOException, MalformedRecordException, SchemaNotFoundException {
+    public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block) throws IOException {
 
         final List<RecordBin> currentBins;
         lock.lock();
@@ -244,7 +239,7 @@ public class RecordBinManager {
     }
 
     public int completeFullEnoughBins() throws IOException {
-        return handleCompletedBins(bin -> bin.isFullEnough());
+        return handleCompletedBins(RecordBin::isFullEnough);
     }
 
     private int handleCompletedBins(final Predicate<RecordBin> completionTest) throws IOException {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 4ba57af..c54bf2a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -44,7 +44,7 @@ public class TestMergeRecord {
         runner = TestRunners.newTestRunner(new MergeRecord());
 
         readerService = new CommaSeparatedRecordReader();
-        writerService = new MockRecordWriter("header", false);
+        writerService = new MockRecordWriter("header", false, true);
 
         runner.addControllerService("reader", readerService);
 
@@ -58,6 +58,25 @@ public class TestMergeRecord {
     }
 
     @Test
+    public void testSmallOutputIsFlushed() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "1");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "1");
+
+        runner.enqueue("Name, Age\nJohn, 35\nJane, 34");
+
+        runner.run(1);
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 1);
+
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
+        mff.assertAttributeEquals("record.count", "2");
+        mff.assertContentEquals("header\nJohn,35\nJane,34\n");
+
+        runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(
+            ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
+    }
+
+    @Test
     public void testMergeSimple() {
         runner.setProperty(MergeRecord.MIN_RECORDS, "2");
         runner.setProperty(MergeRecord.MAX_RECORDS, "2");
@@ -73,7 +92,7 @@ public class TestMergeRecord {
         mff.assertAttributeEquals("record.count", "2");
         mff.assertContentEquals("header\nJohn,35\nJane,34\n");
 
-        runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).stream().forEach(
+        runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(
                 ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
index 734168b..57ec05a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
@@ -135,8 +135,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
         this.configurationContext = context;
 
         final String strategy = context.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
-        final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
-        this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService);
+        if (strategy != null) {
+            final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
+            this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService);
+        }
     }
 
     @Override