You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/02/15 17:00:09 UTC
[nifi] branch master updated: NIFI-6033, NIFI-6034, NIFI-6035,
NIFI-6036,
NIFI-6037: Fixed bugs that were found during 1.9.0-RC1 validation. If
multiple FlowFiles were written to same Content Claim,
and a Processor attempted to read two of them wi within a single session,
it would seek to the wrong part of the content or else throw a
ContentNotFoundException. Updated logic for considering a processor to be
'running' / having 'active threads' if the processor is invalid upon NiFi
restart but scheduled to run. Fixed NPE [...]
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new b508d6b NIFI-6033, NIFI-6034, NIFI-6035, NIFI-6036, NIFI-6037: Fixed bugs that were found during 1.9.0-RC1 validation. If multiple FlowFiles were written to same Content Claim, and a Processor attempted to read two of them wi within a single session, it would seek to the wrong part of the content or else throw a ContentNotFoundException. Updated logic for considering a processor to be 'running' / having 'active threads' if the processor is invalid upon NiFi restart but scheduled [...]
b508d6b is described below
commit b508d6bfbc2b6a68fe2ab152de96611f06a8724a
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Feb 14 15:11:59 2019 -0500
NIFI-6033, NIFI-6034, NIFI-6035, NIFI-6036, NIFI-6037: Fixed bugs that were found during 1.9.0-RC1 validation. If multiple FlowFiles were written to same Content Claim, and a Processor attempted to read two of them wi within a single session, it would seek to the wrong part of the content or else throw a ContentNotFoundException. Updated logic for considering a processor to be 'running' / having 'active threads' if the processor is invalid upon NiFi restart but scheduled to run. Fixed [...]
This closes #3309.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../serialization/record/MockRecordWriter.java | 29 ++++++++----
.../nifi/controller/AbstractComponentNode.java | 17 ++++---
.../nifi/controller/StandardProcessorNode.java | 5 +-
.../repository/StandardProcessSession.java | 4 +-
.../queue/clustered/LoadBalancedQueueIT.java | 4 +-
.../scheduling/ProcessorLifecycleIT.java | 50 --------------------
.../nifi/processors/standard/MergeRecord.java | 6 +--
.../nifi/processors/standard/merge/RecordBin.java | 54 ++++++++++++++--------
.../standard/merge/RecordBinManager.java | 9 +---
.../nifi/processors/standard/TestMergeRecord.java | 23 ++++++++-
.../SchemaRegistryRecordSetWriter.java | 6 ++-
11 files changed, 101 insertions(+), 106 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index d7579e8..9d6b087 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -17,11 +17,6 @@
package org.apache.nifi.serialization.record;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Map;
-
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -30,27 +25,43 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+
public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
private final String header;
private final int failAfterN;
private final boolean quoteValues;
+ private final boolean bufferOutput;
public MockRecordWriter() {
this(null);
}
public MockRecordWriter(final String header) {
- this(header, true, -1);
+ this(header, true, -1, false);
}
public MockRecordWriter(final String header, final boolean quoteValues) {
- this(header, quoteValues, -1);
+ this(header, quoteValues, false);
}
public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) {
+ this(header, quoteValues, failAfterN, false);
+ }
+
+ public MockRecordWriter(final String header, final boolean quoteValues, final boolean bufferOutput) {
+ this(header, quoteValues, -1, bufferOutput);
+ }
+
+ public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN, final boolean bufferOutput) {
this.header = header;
this.quoteValues = quoteValues;
this.failAfterN = failAfterN;
+ this.bufferOutput = bufferOutput;
}
@Override
@@ -59,7 +70,9 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
- public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) {
+ public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream rawOut) {
+ final OutputStream out = bufferOutput ? new BufferedOutputStream(rawOut) : rawOut;
+
return new RecordSetWriter() {
private int recordCount = 0;
private boolean headerWritten = false;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 1f03923..f78c9d2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -204,8 +204,12 @@ public abstract class AbstractComponentNode implements ComponentNode {
}
}
- logger.debug("Resetting Validation State of {} due to setting properties", this);
- resetValidationState();
+ if (isTriggerValidation()) {
+ logger.debug("Resetting Validation State of {} due to setting properties", this);
+ resetValidationState();
+ } else {
+ logger.debug("Properties set for {} but not resettingn validation state because validation is paused", this);
+ }
} finally {
lock.unlock();
}
@@ -642,13 +646,8 @@ public abstract class AbstractComponentNode implements ComponentNode {
public void resumeValidationTrigger() {
triggerValidation = true;
- final ValidationStatus validationStatus = getValidationStatus();
- if (validationStatus == ValidationStatus.VALIDATING) {
- logger.debug("Resuming Triggering of Validation State for {}; status is VALIDATING so will trigger async validation now", this);
- validationTrigger.triggerAsync(this);
- } else {
- logger.debug("Resuming Triggering of Validation State for {}; status is {} so will not trigger async validation now", this, validationStatus);
- }
+ logger.debug("Resuming Triggering of Validation State for {}; Resetting validation state", this);
+ resetValidationState();
}
private boolean isTriggerValidation() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index e59e7fd..3e407f5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1367,7 +1367,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
if (starting) { // will ensure that the Processor represented by this node can only be started once
- hasActiveThreads = true;
initiateStart(taskScheduler, administrativeYieldMillis, timeoutMillis, processContext, schedulingAgentCallback);
} else {
final String procName = processorRef.get().toString();
@@ -1395,7 +1394,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
final Map<Long, ThreadInfo> threadInfoMap = Stream.of(infos)
- .collect(Collectors.toMap(info -> info.getThreadId(), Function.identity(), (a, b) -> a));
+ .collect(Collectors.toMap(ThreadInfo::getThreadId, Function.identity(), (a, b) -> a));
final List<ActiveThreadInfo> threadList = new ArrayList<>(activeThreads.size());
for (final Map.Entry<Thread, ActiveTask> entry : activeThreads.entrySet()) {
@@ -1509,6 +1508,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
try {
+ hasActiveThreads = true;
+
activateThread();
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 071a052..604eb7e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2173,8 +2173,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1.
if (allowCachingOfStream && readRecursionSet.isEmpty() && writeRecursionSet.isEmpty()) {
if (currentReadClaim == claim) {
- if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= offset) {
- final long bytesToSkip = offset - currentReadClaimStream.getBytesConsumed();
+ if (currentReadClaimStream != null && currentReadClaimStream.getCurrentOffset() <= offset) {
+ final long bytesToSkip = offset - currentReadClaimStream.getCurrentOffset();
if (bytesToSkip > 0) {
StreamUtils.skip(currentReadClaimStream, bytesToSkip);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
index 615ae00..e28e178 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
@@ -307,7 +307,7 @@ public class LoadBalancedQueueIT {
}
}
- @Test(timeout = 60_000)
+ @Test(timeout = 90_000)
public void testFailover() throws IOException, InterruptedException {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
@@ -371,7 +371,7 @@ public class LoadBalancedQueueIT {
final int expectedFlowFileReceiveCount = flowFilesPerNode + flowFilesPerNode / 2;
// Wait up to 10 seconds for the server's FlowFile Repository to be updated
- final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L);
+ final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L);
while (serverRepoRecords.size() < expectedFlowFileReceiveCount && System.currentTimeMillis() < endTime) {
Thread.sleep(10L);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
index 55b015d..890d799 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
@@ -485,21 +485,6 @@ public class ProcessorLifecycleIT {
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE);
}
- /**
- * Validate that processor will not be validated on failing
- * PropertyDescriptor validation.
- */
- @Test(expected = IllegalStateException.class)
- public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception {
- final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- flowManager = fcsb.getFlowManager();
-
- ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
- ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
- fcsb.getSystemBundle().getBundleDetails().getCoordinate());
- processScheduler.startProcessor(testProcNode, true);
- fail();
- }
/**
* Validate that processor will not be validated on failing
@@ -527,41 +512,6 @@ public class ProcessorLifecycleIT {
fail();
}
- /**
- * The successful processor start with ControllerService dependency.
- */
- @Test
- public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception {
- final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
- flowManager = fcsb.getFlowManager();
-
- ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
-
- ControllerServiceNode testServiceNode = flowManager.createControllerService(TestService.class.getName(), "foo",
- fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true, true);
- testGroup.addControllerService(testServiceNode);
-
- ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
- fcsb.getSystemBundle().getBundleDetails().getCoordinate());
- testGroup.addProcessor(testProcNode);
-
- properties.put("S", testServiceNode.getIdentifier());
- testProcNode.setProperties(properties);
-
- TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
- testProcessor.withService = true;
- this.noop(testProcessor);
-
- testServiceNode.performValidation();
- processScheduler.enableControllerService(testServiceNode);
-
- testProcNode.performValidation();
- processScheduler.startProcessor(testProcNode, true);
-
- Thread.sleep(500);
- assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
- }
-
/**
* Scenario where onTrigger() is executed with random delay limited to
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index ecebb8e..130d6b6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -314,7 +314,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
final boolean block;
- if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
+ if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
block = true;
} else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
block = true;
@@ -378,12 +378,12 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final RecordSchema schema, final ProcessSession session) {
final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
- if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
+ if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
return flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
}
final Optional<String> optionalText = schema.getSchemaText();
- final String schemaText = optionalText.isPresent() ? optionalText.get() : AvroTypeUtil.extractAvroSchema(schema).toString();
+ final String schemaText = optionalText.orElseGet(() -> AvroTypeUtil.extractAvroSchema(schema).toString());
final String groupId;
final String correlationshipAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index 23f5edf..d15ba0f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -46,8 +46,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class RecordBin {
- public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
- public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
private final ComponentLog logger;
private final ProcessSession session;
@@ -70,6 +68,8 @@ public class RecordBin {
private static final AtomicLong idGenerator = new AtomicLong(0L);
private final long id = idGenerator.getAndIncrement();
+ private volatile int requiredRecordCount = -1;
+
public RecordBin(final ProcessContext context, final ProcessSession session, final ComponentLog logger, final RecordBinThresholds thresholds) {
this.session = session;
@@ -95,7 +95,6 @@ public class RecordBin {
}
public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block) throws IOException {
-
if (isComplete()) {
logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
return false;
@@ -148,6 +147,12 @@ public class RecordBin {
flowFileMigrated = true;
this.flowFiles.add(flowFile);
+ if (recordCount >= getMinimumRecordCount()) {
+ // If we have met our minimum record count, we need to flush so that when we reach the desired number of bytes
+ // the bin is considered 'full enough'.
+ recordWriter.flush();
+ }
+
if (isFull()) {
logger.debug(this + " is now full. Completing bin.");
complete("Bin is full");
@@ -232,6 +237,29 @@ public class RecordBin {
}
}
+ private int getMinimumRecordCount() {
+ final int currentCount = requiredRecordCount;
+ if (currentCount > -1) {
+ return currentCount;
+ }
+
+ int requiredCount;
+ final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
+ if (recordCountAttribute.isPresent()) {
+ final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
+ try {
+ requiredCount = Integer.parseInt(recordCountValue);
+ } catch (final NumberFormatException e) {
+ requiredCount = 1;
+ }
+ } else {
+ requiredCount = thresholds.getMinRecords();
+ }
+
+ this.requiredRecordCount = requiredCount;
+ return requiredCount;
+ }
+
public boolean isFullEnough() {
readLock.lock();
try {
@@ -239,19 +267,7 @@ public class RecordBin {
return false;
}
- int requiredRecordCount;
- final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
- if (recordCountAttribute.isPresent()) {
- final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
- try {
- requiredRecordCount = Integer.parseInt(recordCountValue);
- } catch (final NumberFormatException e) {
- requiredRecordCount = 1;
- }
- } else {
- requiredRecordCount = thresholds.getMinRecords();
- }
-
+ final int requiredRecordCount = getMinimumRecordCount();
return (recordCount >= requiredRecordCount && out.getBytesWritten() >= thresholds.getMinBytes());
} finally {
readLock.unlock();
@@ -386,11 +402,11 @@ public class RecordBin {
attributes.putAll(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), recordWriter.getMimeType());
- attributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size()));
- attributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge()));
+ attributes.put(MergeRecord.MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size()));
+ attributes.put(MergeRecord.MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge()));
merged = session.putAllAttributes(merged, attributes);
- flowFiles.stream().forEach(ff -> session.putAttribute(ff, "merge.uuid", merged.getAttribute(CoreAttributes.UUID.key())));
+ flowFiles.forEach(ff -> session.putAttribute(ff, MergeRecord.MERGE_UUID_ATTRIBUTE, merged.getAttribute(CoreAttributes.UUID.key())));
session.getProvenanceReporter().join(flowFiles, merged, "Records Merged due to: " + completionReason);
session.transfer(merged, MergeRecord.REL_MERGED);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index 312832e..d1dde2a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -26,8 +26,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.standard.MergeContent;
import org.apache.nifi.processors.standard.MergeRecord;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import java.io.IOException;
@@ -109,12 +107,9 @@ public class RecordBinManager {
* @param block if another thread is already writing to the desired bin, passing <code>true</code> for this parameter will block until the other thread(s) have finished so
* that the records can still be added to the desired bin. Passing <code>false</code> will result in moving on to another bin.
*
- * @throws SchemaNotFoundException if unable to find the schema for the record writer
- * @throws MalformedRecordException if unable to read a record
* @throws IOException if there is an IO problem reading from the stream or writing to the stream
*/
- public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block)
- throws IOException, MalformedRecordException, SchemaNotFoundException {
+ public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block) throws IOException {
final List<RecordBin> currentBins;
lock.lock();
@@ -244,7 +239,7 @@ public class RecordBinManager {
}
public int completeFullEnoughBins() throws IOException {
- return handleCompletedBins(bin -> bin.isFullEnough());
+ return handleCompletedBins(RecordBin::isFullEnough);
}
private int handleCompletedBins(final Predicate<RecordBin> completionTest) throws IOException {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 4ba57af..c54bf2a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -44,7 +44,7 @@ public class TestMergeRecord {
runner = TestRunners.newTestRunner(new MergeRecord());
readerService = new CommaSeparatedRecordReader();
- writerService = new MockRecordWriter("header", false);
+ writerService = new MockRecordWriter("header", false, true);
runner.addControllerService("reader", readerService);
@@ -58,6 +58,25 @@ public class TestMergeRecord {
}
@Test
+ public void testSmallOutputIsFlushed() {
+ runner.setProperty(MergeRecord.MIN_RECORDS, "1");
+ runner.setProperty(MergeRecord.MAX_RECORDS, "1");
+
+ runner.enqueue("Name, Age\nJohn, 35\nJane, 34");
+
+ runner.run(1);
+ runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+ runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 1);
+
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
+ mff.assertAttributeEquals("record.count", "2");
+ mff.assertContentEquals("header\nJohn,35\nJane,34\n");
+
+ runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(
+ ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
+ }
+
+ @Test
public void testMergeSimple() {
runner.setProperty(MergeRecord.MIN_RECORDS, "2");
runner.setProperty(MergeRecord.MAX_RECORDS, "2");
@@ -73,7 +92,7 @@ public class TestMergeRecord {
mff.assertAttributeEquals("record.count", "2");
mff.assertContentEquals("header\nJohn,35\nJane,34\n");
- runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).stream().forEach(
+ runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(
ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
index 734168b..57ec05a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
@@ -135,8 +135,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
this.configurationContext = context;
final String strategy = context.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
- final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
- this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService);
+ if (strategy != null) {
+ final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
+ this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService);
+ }
}
@Override