You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/04/03 20:12:24 UTC
[nifi] 06/17: NIFI-5918 Fix issue with MergeRecord when
DefragmentStrategy is on
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch NIFI-6169-RC2
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 11a9f10f5b67c53d8ab66dbfd04ffdcba681ead7
Author: Andres Garagiola <an...@gmail.com>
AuthorDate: Fri Feb 22 17:15:59 2019 -0300
NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on
This closes #3334.
Signed-off-by: Andres Garagiola <an...@gmail.com>
Signed-off-by: Koji Kawamura <ij...@apache.org>
---
.../nifi/processors/standard/merge/RecordBin.java | 55 +++++++++-------------
.../standard/merge/RecordBinManager.java | 10 ++--
.../standard/merge/RecordBinThresholds.java | 10 ++--
3 files changed, 31 insertions(+), 44 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index d15ba0f..a96e119 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -63,6 +63,7 @@ public class RecordBin {
private RecordSetWriter recordWriter;
private ByteCountingOutputStream out;
private int recordCount = 0;
+ private int fragmentCount = 0;
private volatile boolean complete = false;
private static final AtomicLong idGenerator = new AtomicLong(0L);
@@ -114,7 +115,7 @@ public class RecordBin {
}
boolean flowFileMigrated = false;
-
+ this.fragmentCount++;
try {
if (isComplete()) {
logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
@@ -202,26 +203,7 @@ public class RecordBin {
return false;
}
- int maxRecords;
- final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
- if (recordCountAttribute.isPresent()) {
- final Optional<String> recordCountValue = flowFiles.stream()
- .filter(ff -> ff.getAttribute(recordCountAttribute.get()) != null)
- .map(ff -> ff.getAttribute(recordCountAttribute.get()))
- .findFirst();
-
- if (!recordCountValue.isPresent()) {
- return false;
- }
-
- try {
- maxRecords = Integer.parseInt(recordCountValue.get());
- } catch (final NumberFormatException e) {
- maxRecords = 1;
- }
- } else {
- maxRecords = thresholds.getMaxRecords();
- }
+ int maxRecords = thresholds.getMaxRecords();
if (recordCount >= maxRecords) {
return true;
@@ -231,6 +213,22 @@ public class RecordBin {
return true;
}
+ Optional<String> fragmentCountAttribute = thresholds.getFragmentCountAttribute();
+ if(fragmentCountAttribute != null && fragmentCountAttribute.isPresent()) {
+ final Optional<String> fragmentCountValue = flowFiles.stream()
+ .filter(ff -> ff.getAttribute(fragmentCountAttribute.get()) != null)
+ .map(ff -> ff.getAttribute(fragmentCountAttribute.get()))
+ .findFirst();
+ if (fragmentCountValue.isPresent()) {
+ try {
+ int expectedFragments = Integer.parseInt(fragmentCountValue.get());
+ if (this.fragmentCount == expectedFragments)
+ return true;
+ } catch (NumberFormatException nfe) {
+ this.logger.error(nfe.getMessage(), nfe);
+ }
+ }
+ }
return false;
} finally {
readLock.unlock();
@@ -243,18 +241,7 @@ public class RecordBin {
return currentCount;
}
- int requiredCount;
- final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
- if (recordCountAttribute.isPresent()) {
- final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
- try {
- requiredCount = Integer.parseInt(recordCountValue);
- } catch (final NumberFormatException e) {
- requiredCount = 1;
- }
- } else {
- requiredCount = thresholds.getMinRecords();
- }
+ int requiredCount = thresholds.getMinRecords();
this.requiredRecordCount = requiredCount;
return requiredCount;
@@ -347,7 +334,7 @@ public class RecordBin {
}
// If using defragment mode, and we don't have enough FlowFiles, then we need to fail this bin.
- final Optional<String> countAttr = thresholds.getRecordCountAttribute();
+ final Optional<String> countAttr = thresholds.getFragmentCountAttribute();
if (countAttr.isPresent()) {
// Ensure that at least one FlowFile has a fragment.count attribute and that they all have the same value, if they have a value.
Integer expectedBinCount = null;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index d1dde2a..f0891a9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -189,17 +189,17 @@ public class RecordBinManager {
final PropertyValue maxMillisValue = context.getProperty(MergeRecord.MAX_BIN_AGE);
final String maxBinAge = maxMillisValue.getValue();
- final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS).longValue() : Long.MAX_VALUE;
+ final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS) : Long.MAX_VALUE;
- final String recordCountAttribute;
+ final String fragmentCountAttribute;
final String mergeStrategy = context.getProperty(MergeRecord.MERGE_STRATEGY).getValue();
if (MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
- recordCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
+ fragmentCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
} else {
- recordCountAttribute = null;
+ fragmentCountAttribute = null;
}
- return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, recordCountAttribute);
+ return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, fragmentCountAttribute);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
index 16c05ac..8f1a5c8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
@@ -26,17 +26,17 @@ public class RecordBinThresholds {
private final long maxBytes;
private final long maxBinMillis;
private final String maxBinAge;
- private final Optional<String> recordCountAttribute;
+ private final Optional<String> fragmentCountAttribute;
public RecordBinThresholds(final int minRecords, final int maxRecords, final long minBytes, final long maxBytes, final long maxBinMillis,
- final String maxBinAge, final String recordCountAttribute) {
+ final String maxBinAge, final String fragmentCountAttribute) {
this.minRecords = minRecords;
this.maxRecords = maxRecords;
this.minBytes = minBytes;
this.maxBytes = maxBytes;
this.maxBinMillis = maxBinMillis;
this.maxBinAge = maxBinAge;
- this.recordCountAttribute = Optional.ofNullable(recordCountAttribute);
+ this.fragmentCountAttribute = Optional.ofNullable(fragmentCountAttribute);
}
public int getMinRecords() {
@@ -63,7 +63,7 @@ public class RecordBinThresholds {
return maxBinAge;
}
- public Optional<String> getRecordCountAttribute() {
- return recordCountAttribute;
+ public Optional<String> getFragmentCountAttribute() {
+ return fragmentCountAttribute;
}
}