You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/07/12 20:58:12 UTC
[2/2] nifi git commit: NIFI-4060: Initial implementation of
MergeRecord
NIFI-4060: Initial implementation of MergeRecord
NIFI-4060: Addressed threading issue with RecordBin being updated after it is completed; fixed issue that caused mime.type attribute not to be written properly if all incoming flowfiles already have a different value for that attribute
NIFI-4060: Bug fixes; improved documentation; added a lot of debug information; updated StandardProcessSession to produce more accurate logs in case of a session being committed/rolled back with open input/output streams
Signed-off-by: Matt Burgess <ma...@apache.org>
This closes #1958
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b603cb95
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b603cb95
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b603cb95
Branch: refs/heads/master
Commit: b603cb955dcd1d3d9b5e374e5760f2f9b047bda9
Parents: eefad29
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jun 26 13:15:03 2017 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Wed Jul 12 16:36:48 2017 -0400
----------------------------------------------------------------------
.../nifi/processor/util/bin/BinFiles.java | 14 +-
.../record/CommaSeparatedRecordReader.java | 102 +++++
.../repository/StandardProcessSession.java | 19 +-
.../nifi/processors/standard/MergeContent.java | 104 +----
.../nifi/processors/standard/MergeRecord.java | 358 ++++++++++++++++
.../standard/merge/AttributeStrategy.java | 27 ++
.../standard/merge/AttributeStrategyUtil.java | 56 +++
.../merge/KeepCommonAttributeStrategy.java | 64 +++
.../merge/KeepUniqueAttributeStrategy.java | 58 +++
.../processors/standard/merge/RecordBin.java | 424 +++++++++++++++++++
.../standard/merge/RecordBinManager.java | 295 +++++++++++++
.../standard/merge/RecordBinThresholds.java | 69 +++
.../org.apache.nifi.processor.Processor | 1 +
.../additionalDetails.html | 229 ++++++++++
.../processors/standard/TestMergeContent.java | 9 +-
.../processors/standard/TestMergeRecord.java | 360 ++++++++++++++++
16 files changed, 2076 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index b15d23b..7f79b70 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -131,9 +131,10 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
*
* @param context context
* @param flowFile flowFile
+ * @param session the session for accessing the FlowFile
* @return The appropriate group ID
*/
- protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile);
+ protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session);
/**
* Performs any additional setup of the bin manager. Called during the OnScheduled phase.
@@ -271,8 +272,15 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>();
for (FlowFile flowFile : flowFiles) {
flowFile = this.preprocessFlowFile(context, session, flowFile);
- final String groupingIdentifier = getGroupId(context, flowFile);
- flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
+
+ try {
+ final String groupingIdentifier = getGroupId(context, flowFile, session);
+ flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
+ } catch (final Exception e) {
+ getLogger().error("Could not determine which Bin to add {} to; will route to failure", new Object[] {flowFile}, e);
+ session.transfer(flowFile, REL_FAILURE);
+ continue;
+ }
}
for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
new file mode 100644
index 0000000..8973055
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+
+public class CommaSeparatedRecordReader extends AbstractControllerService implements RecordReaderFactory {
+ private int failAfterN;
+ private int recordCount = 0;
+
+ public CommaSeparatedRecordReader() {
+ this(-1);
+ }
+
+ public CommaSeparatedRecordReader(final int failAfterN) {
+ this.failAfterN = failAfterN;
+ }
+
+ public void failAfter(final int failAfterN) {
+ this.failAfterN = failAfterN;
+ }
+
+ @Override
+ public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+
+ final List<RecordField> fields = new ArrayList<>();
+
+ final String headerLine = reader.readLine();
+ for (final String colName : headerLine.split(",")) {
+ fields.add(new RecordField(colName.trim(), RecordFieldType.STRING.getDataType()));
+ }
+
+ return new RecordReader() {
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public Record nextRecord() throws IOException, MalformedRecordException {
+ if (failAfterN > -1 && recordCount >= failAfterN) {
+ throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
+ }
+
+ final String nextLine = reader.readLine();
+ if (nextLine == null) {
+ return null;
+ }
+
+ recordCount++;
+
+ final String[] values = nextLine.split(",");
+ final Map<String, Object> valueMap = new HashMap<>();
+ int i = 0;
+ for (final RecordField field : fields) {
+ final String fieldName = field.getFieldName();
+ valueMap.put(fieldName, values[i++].trim());
+ }
+
+ return new MapRecord(new SimpleRecordSchema(fields), valueMap);
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return new SimpleRecordSchema(fields);
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index d34d8cf..d2a6af6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -191,13 +191,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
processingStartTime = System.nanoTime();
}
- private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap) {
+ private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap, final String action, final String streamType) {
final Map<FlowFile, ? extends Closeable> openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List
for (final Map.Entry<FlowFile, ? extends Closeable> entry : openStreamCopy.entrySet()) {
final FlowFile flowFile = entry.getKey();
final Closeable openStream = entry.getValue();
- LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", this, openStream, flowFile);
+ LOG.warn("{} closing {} for {} because the session was {} without the {} stream being closed.", this, openStream, flowFile, action, streamType);
try {
openStream.close();
@@ -212,8 +212,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
resetWriteClaims(false);
- closeStreams(openInputStreams);
- closeStreams(openOutputStreams);
+ closeStreams(openInputStreams, "committed", "input");
+ closeStreams(openOutputStreams, "committed", "output");
if (!readRecursionSet.isEmpty()) {
throw new IllegalStateException();
@@ -914,8 +914,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
deleteOnCommit.clear();
- closeStreams(openInputStreams);
- closeStreams(openOutputStreams);
+ closeStreams(openInputStreams, "rolled back", "input");
+ closeStreams(openOutputStreams, "rolled back", "output");
try {
claimCache.reset();
@@ -2171,7 +2171,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e);
}
- final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), false);
+ final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn);
final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
@@ -2470,7 +2470,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final long bytesWritten = countingOut.getBytesWritten();
StandardProcessSession.this.bytesWritten += bytesWritten;
- openOutputStreams.remove(sourceFlowFile);
+ final OutputStream removed = openOutputStreams.remove(sourceFlowFile);
+ if (removed == null) {
+ LOG.error("Closed Session's OutputStream but there was no entry for it in the map; sourceFlowFile={}; map={}", sourceFlowFile, openOutputStreams);
+ }
flush();
removeTemporaryClaim(record);
http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 3401d66..edbc033 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -82,6 +82,8 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.bin.Bin;
import org.apache.nifi.processor.util.bin.BinFiles;
import org.apache.nifi.processor.util.bin.BinManager;
+import org.apache.nifi.processors.standard.merge.AttributeStrategy;
+import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
import org.apache.nifi.stream.io.NonCloseableOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FlowFilePackager;
@@ -126,7 +128,7 @@ import org.apache.nifi.util.FlowFilePackagerV3;
@WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
@WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
+ "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") })
-@SeeAlso(SegmentContent.class)
+@SeeAlso({SegmentContent.class, MergeRecord.class})
public class MergeContent extends BinFiles {
// preferred attributes
@@ -201,8 +203,6 @@ public class MergeContent extends BinFiles {
MERGE_FORMAT_AVRO_VALUE,
"The Avro contents of all FlowFiles will be concatenated together into a single FlowFile");
- public static final String ATTRIBUTE_STRATEGY_ALL_COMMON = "Keep Only Common Attributes";
- public static final String ATTRIBUTE_STRATEGY_ALL_UNIQUE = "Keep All Unique Attributes";
public static final String TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions";
public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
@@ -224,16 +224,6 @@ public class MergeContent extends BinFiles {
.allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO)
.defaultValue(MERGE_FORMAT_CONCAT.getValue())
.build();
- public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder()
- .required(true)
- .name("Attribute Strategy")
- .description("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any "
- + "attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. "
- + "If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same "
- + "value, will be preserved.")
- .allowableValues(ATTRIBUTE_STRATEGY_ALL_COMMON, ATTRIBUTE_STRATEGY_ALL_UNIQUE)
- .defaultValue(ATTRIBUTE_STRATEGY_ALL_COMMON)
- .build();
public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("Correlation Attribute Name")
@@ -315,7 +305,7 @@ public class MergeContent extends BinFiles {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(MERGE_STRATEGY);
descriptors.add(MERGE_FORMAT);
- descriptors.add(ATTRIBUTE_STRATEGY);
+ descriptors.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY);
descriptors.add(CORRELATION_ATTRIBUTE_NAME);
descriptors.add(MIN_ENTRIES);
descriptors.add(MAX_ENTRIES);
@@ -378,7 +368,7 @@ public class MergeContent extends BinFiles {
}
@Override
- protected String getGroupId(final ProcessContext context, final FlowFile flowFile) {
+ protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session) {
final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME)
.evaluateAttributeExpressions(flowFile).getValue();
String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName);
@@ -429,16 +419,7 @@ public class MergeContent extends BinFiles {
throw new AssertionError();
}
- final AttributeStrategy attributeStrategy;
- switch (context.getProperty(ATTRIBUTE_STRATEGY).getValue()) {
- case ATTRIBUTE_STRATEGY_ALL_UNIQUE:
- attributeStrategy = new KeepUniqueAttributeStrategy();
- break;
- case ATTRIBUTE_STRATEGY_ALL_COMMON:
- default:
- attributeStrategy = new KeepCommonAttributeStrategy();
- break;
- }
+ final AttributeStrategy attributeStrategy = AttributeStrategyUtil.strategyFor(context);
final List<FlowFile> contents = bin.getContents();
final ProcessSession binSession = bin.getSession();
@@ -989,76 +970,7 @@ public class MergeContent extends BinFiles {
}
}
- private static class KeepUniqueAttributeStrategy implements AttributeStrategy {
-
- @Override
- public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
- final Map<String, String> newAttributes = new HashMap<>();
- final Set<String> conflicting = new HashSet<>();
-
- for (final FlowFile flowFile : flowFiles) {
- for (final Map.Entry<String, String> attributeEntry : flowFile.getAttributes().entrySet()) {
- final String name = attributeEntry.getKey();
- final String value = attributeEntry.getValue();
-
- final String existingValue = newAttributes.get(name);
- if (existingValue != null && !existingValue.equals(value)) {
- conflicting.add(name);
- } else {
- newAttributes.put(name, value);
- }
- }
- }
-
- for (final String attributeToRemove : conflicting) {
- newAttributes.remove(attributeToRemove);
- }
-
- // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
- newAttributes.remove(CoreAttributes.UUID.key());
- return newAttributes;
- }
- }
-
- private static class KeepCommonAttributeStrategy implements AttributeStrategy {
-
- @Override
- public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
- final Map<String, String> result = new HashMap<>();
-
- //trivial cases
- if (flowFiles == null || flowFiles.isEmpty()) {
- return result;
- } else if (flowFiles.size() == 1) {
- result.putAll(flowFiles.iterator().next().getAttributes());
- }
-
- /*
- * Start with the first attribute map and only put an entry to the
- * resultant map if it is common to every map.
- */
- final Map<String, String> firstMap = flowFiles.iterator().next().getAttributes();
-
- outer:
- for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
- final String key = mapEntry.getKey();
- final String value = mapEntry.getValue();
-
- for (final FlowFile flowFile : flowFiles) {
- final Map<String, String> currMap = flowFile.getAttributes();
- final String curVal = currMap.get(key);
- if (curVal == null || !curVal.equals(value)) {
- continue outer;
- }
- }
- result.put(key, value);
- }
- // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
- result.remove(CoreAttributes.UUID.key());
- return result;
- }
- }
private static class FragmentComparator implements Comparator<FlowFile> {
@@ -1079,8 +991,4 @@ public class MergeContent extends BinFiles {
List<FlowFile> getUnmergedFlowFiles();
}
- private interface AttributeStrategy {
-
- Map<String, String> getMergedAttributes(List<FlowFile> flowFiles);
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
new file mode 100644
index 0000000..b0e3f48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.FlowFileFilters;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
+import org.apache.nifi.processors.standard.merge.RecordBinManager;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@SideEffectFree
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"merge", "record", "content", "correlation", "stream", "event"})
+@CapabilityDescription("This Processor merges together multiple record-oriented FlowFiles into a single FlowFile that contains all of the Records of the input FlowFiles. "
+ + "This Processor works by creating 'bins' and then adding FlowFiles to these bins until they are full. Once a bin is full, all of the FlowFiles will be combined into "
+ + "a single output FlowFile, and that FlowFile will be routed to the 'merged' Relationship. A bin will consist of potentially many 'like FlowFiles'. In order for two "
+ + "FlowFiles to be considered 'like FlowFiles', they must have the same Schema (as identified by the Record Reader) and, if the <Correlation Attribute Name> property "
+ + "is set, the same value for the specified attribute. See Processor Usage and Additional Details for more information.")
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+ + "All FlowFiles with the same value for this attribute will be bundled together."),
+ @ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This "
+ + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+ + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected "
+ + "in the given bundle."),
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = "record.count", description = "The merged FlowFile will have a 'record.count' attribute indicating the number of records "
+ + "that were written to the FlowFile."),
+ @WritesAttribute(attribute = "mime.type", description = "The MIME Type indicated by the Record Writer"),
+ @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
+ @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
+ + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"),
+ @WritesAttribute(attribute = "<Attributes from Record Writer>", description = "Any Attribute that the configured Record Writer returns will be added to the FlowFile.")
+})
+@SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class})
+public class MergeRecord extends AbstractSessionFactoryProcessor {
+ // attributes for defragmentation
+ public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
+
+ public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
+ public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
+
+ public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
+ "Bin-Packing Algorithm",
+ "Bin-Packing Algorithm",
+ "Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally "
+ + "their attributes (if the <Correlation Attribute> property is set)");
+ public static final AllowableValue MERGE_STRATEGY_DEFRAGMENT = new AllowableValue(
+ "Defragment",
+ "Defragment",
+ "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must "
+ + "have the attributes <fragment.identifier> and <fragment.count>. All FlowFiles with the same value for \"fragment.identifier\" "
+ + "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. The ordering of "
+ + "the Records that are output is not guaranteed.");
+
+
+ public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for reading incoming data")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing out the records")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder()
+ .name("merge-strategy")
+ .displayName("Merge Strategy")
+ .description("Specifies the algorithm used to merge records. The 'Defragment' algorithm combines fragments that are associated by "
+ + "attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily "
+ + "chosen FlowFiles")
+ .required(true)
+ .allowableValues(MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT)
+ .defaultValue(MERGE_STRATEGY_BIN_PACK.getValue())
+ .build();
+ public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+ .name("correlation-attribute-name")
+ .displayName("Correlation Attribute Name")
+ .description("If specified, two FlowFiles will be binned together only if they have the same value for "
+ + "this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.")
+ .required(false)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+ .defaultValue(null)
+ .build();
+ public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
+ .name("min-bin-size")
+ .displayName("Minimum Bin Size")
+ .description("The minimum size of for the bin")
+ .required(true)
+ .defaultValue("0 B")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
+ .name("max-bin-size")
+ .displayName("Maximum Bin Size")
+ .description("The maximum size for the bundle. If not specified, there is no maximum. This is a 'soft limit' in that if a FlowFile is added to a bin, "
+ + "all records in that FlowFile will be added, so this limit may be exceeded by up to the number of bytes in last input FlowFile.")
+ .required(false)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MIN_RECORDS = new PropertyDescriptor.Builder()
+ .name("min-records")
+ .displayName("Minimum Number of Records")
+ .description("The minimum number of records to include in a bin")
+ .required(true)
+ .defaultValue("1")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MAX_RECORDS = new PropertyDescriptor.Builder()
+ .name("max-records")
+ .displayName("Maximum Number of Records")
+ .description("The maximum number of Records to include in a bin. This is a 'soft limit' in that if a FlowFIle is added to a bin, all records in that FlowFile will be added, "
+ + "so this limit may be exceeded by up to the number of records in the last input FlowFile.")
+ .required(false)
+ .defaultValue("1000")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
+ .name("max.bin.count")
+ .displayName("Maximum Number of Bins")
+ .description("Specifies the maximum number of bins that can be held in memory at any one time. "
+ + "This number should not be smaller than the maximum number of conurrent threads for this Processor, "
+ + "or the bins that are created will often consist only of a single incoming FlowFile.")
+ .defaultValue("10")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
+ .name("max-bin-age")
+ .displayName("Max Bin Age")
+ .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> "
+ + "where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
+ .required(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+
+
+ public static final Relationship REL_MERGED = new Relationship.Builder()
+ .name("merged")
+ .description("The FlowFile containing the merged records")
+ .build();
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The FlowFiles that were used to create the bundle")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure")
+ .build();
+
+ private final AtomicReference<RecordBinManager> binManager = new AtomicReference<>();
+
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RECORD_READER);
+ properties.add(RECORD_WRITER);
+ properties.add(MERGE_STRATEGY);
+ properties.add(CORRELATION_ATTRIBUTE_NAME);
+ properties.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY);
+ properties.add(MIN_RECORDS);
+ properties.add(MAX_RECORDS);
+ properties.add(MIN_SIZE);
+ properties.add(MAX_SIZE);
+ properties.add(MAX_BIN_AGE);
+ properties.add(MAX_BIN_COUNT);
+ return properties;
+ }
+
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_ORIGINAL);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_MERGED);
+ return relationships;
+ }
+
+
+ @OnStopped
+ public final void resetState() {
+ final RecordBinManager manager = binManager.get();
+ if (manager != null) {
+ manager.purge();
+ }
+ binManager.set(null);
+ }
+
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ RecordBinManager manager = binManager.get();
+ while (manager == null) {
+ manager = new RecordBinManager(context, sessionFactory, getLogger());
+ manager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+ final boolean updated = binManager.compareAndSet(null, manager);
+ if (!updated) {
+ manager = binManager.get();
+ }
+ }
+
+ final ProcessSession session = sessionFactory.createSession();
+ final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+ if (getLogger().isDebugEnabled()) {
+ final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+ getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
+ }
+
+ final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+ final boolean block;
+ if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
+ block = true;
+ } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+ block = true;
+ } else {
+ block = false;
+ }
+
+ try {
+ for (final FlowFile flowFile : flowFiles) {
+ try {
+ binFlowFile(context, flowFile, session, manager, block);
+ } catch (final Exception e) {
+ getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+ } finally {
+ session.commit();
+ }
+
+ try {
+ manager.completeExpiredBins();
+ } catch (final Exception e) {
+ getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+ }
+
+ if (flowFiles.isEmpty()) {
+ getLogger().debug("No FlowFiles to bin; will yield");
+ context.yield();
+ }
+ }
+
+
+ private void binFlowFile(final ProcessContext context, final FlowFile flowFile, final ProcessSession session, final RecordBinManager binManager, final boolean block) {
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ try (final InputStream in = session.read(flowFile);
+ final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+ final RecordSchema schema = reader.getSchema();
+
+ final String groupId = getGroupId(context, flowFile, schema, session);
+ getLogger().debug("Got Group ID {} for {}", new Object[] {groupId, flowFile});
+
+ binManager.add(groupId, flowFile, reader, session, block);
+ } catch (MalformedRecordException | IOException | SchemaNotFoundException e) {
+ throw new ProcessException(e);
+ }
+ }
+
+
+ protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final RecordSchema schema, final ProcessSession session) {
+ final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+ if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
+ return flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
+ }
+
+ final Optional<String> optionalText = schema.getSchemaText();
+ final String schemaText = optionalText.isPresent() ? optionalText.get() : AvroTypeUtil.extractAvroSchema(schema).toString();
+
+ final String groupId;
+ final String correlationshipAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
+ if (correlationshipAttributeName != null) {
+ final String correlationAttr = flowFile.getAttribute(correlationshipAttributeName);
+ groupId = correlationAttr == null ? schemaText : schemaText + correlationAttr;
+ } else {
+ groupId = schemaText;
+ }
+
+ return groupId;
+ }
+
+ int getBinCount() {
+ return binManager.get().getBinCount();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java
new file mode 100644
index 0000000..e3c9da9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public interface AttributeStrategy {
+ Map<String, String> getMergedAttributes(List<FlowFile> flowFiles);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java
new file mode 100644
index 0000000..221eb04
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+
+public class AttributeStrategyUtil {
+
+ public static final AllowableValue ATTRIBUTE_STRATEGY_ALL_COMMON = new AllowableValue("Keep Only Common Attributes", "Keep Only Common Attributes",
+ "Any attribute that is not the same on all FlowFiles in a bin will be dropped. Those that are the same across all FlowFiles will be retained.");
+ public static final AllowableValue ATTRIBUTE_STRATEGY_ALL_UNIQUE = new AllowableValue("Keep All Unique Attributes", "Keep All Unique Attributes",
+ "Any attribute that has the same value for all FlowFiles in a bin, or has no value for a FlowFile, will be kept. For example, if a bin consists of 3 FlowFiles "
+ + "and 2 of them have a value of 'hello' for the 'greeting' attribute and the third FlowFile has no 'greeting' attribute then the outbound FlowFile will get "
+ + "a 'greeting' attribute with the value 'hello'.");
+
+ public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder()
+ .required(true)
+ .name("Attribute Strategy")
+ .description("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any "
+ + "attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. "
+ + "If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same "
+ + "value, will be preserved.")
+ .allowableValues(ATTRIBUTE_STRATEGY_ALL_COMMON, ATTRIBUTE_STRATEGY_ALL_UNIQUE)
+ .defaultValue(ATTRIBUTE_STRATEGY_ALL_COMMON.getValue())
+ .build();
+
+
+ public static AttributeStrategy strategyFor(ProcessContext context) {
+ final String strategyName = context.getProperty(ATTRIBUTE_STRATEGY).getValue();
+ if (ATTRIBUTE_STRATEGY_ALL_UNIQUE.getValue().equals(strategyName)) {
+ return new KeepUniqueAttributeStrategy();
+ }
+ if (ATTRIBUTE_STRATEGY_ALL_COMMON.getValue().equals(strategyName)) {
+ return new KeepCommonAttributeStrategy();
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java
new file mode 100644
index 0000000..5e7920a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+
+public class KeepCommonAttributeStrategy implements AttributeStrategy {
+
+ @Override
+ public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
+ final Map<String, String> result = new HashMap<>();
+
+ //trivial cases
+ if (flowFiles == null || flowFiles.isEmpty()) {
+ return result;
+ } else if (flowFiles.size() == 1) {
+ result.putAll(flowFiles.iterator().next().getAttributes());
+ }
+
+ /*
+ * Start with the first attribute map and only put an entry to the
+ * resultant map if it is common to every map.
+ */
+ final Map<String, String> firstMap = flowFiles.iterator().next().getAttributes();
+
+ outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+ final String key = mapEntry.getKey();
+ final String value = mapEntry.getValue();
+
+ for (final FlowFile flowFile : flowFiles) {
+ final Map<String, String> currMap = flowFile.getAttributes();
+ final String curVal = currMap.get(key);
+ if (curVal == null || !curVal.equals(value)) {
+ continue outer;
+ }
+ }
+ result.put(key, value);
+ }
+
+ // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
+ result.remove(CoreAttributes.UUID.key());
+ return result;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java
new file mode 100644
index 0000000..86fb198
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+
+public class KeepUniqueAttributeStrategy implements AttributeStrategy {
+
+ @Override
+ public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
+ final Map<String, String> newAttributes = new HashMap<>();
+ final Set<String> conflicting = new HashSet<>();
+
+ for (final FlowFile flowFile : flowFiles) {
+ for (final Map.Entry<String, String> attributeEntry : flowFile.getAttributes().entrySet()) {
+ final String name = attributeEntry.getKey();
+ final String value = attributeEntry.getValue();
+
+ final String existingValue = newAttributes.get(name);
+ if (existingValue != null && !existingValue.equals(value)) {
+ conflicting.add(name);
+ } else {
+ newAttributes.put(name, value);
+ }
+ }
+ }
+
+ for (final String attributeToRemove : conflicting) {
+ newAttributes.remove(attributeToRemove);
+ }
+
+ // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
+ newAttributes.remove(CoreAttributes.UUID.key());
+ return newAttributes;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
new file mode 100644
index 0000000..0b4cf31
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.standard.MergeRecord;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+
+public class RecordBin {
+ public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
+ public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
+
+ private final ComponentLog logger;
+ private final ProcessSession session;
+ private final RecordSetWriterFactory writerFactory;
+ private final RecordBinThresholds thresholds;
+ private final ProcessContext context;
+
+ private final List<FlowFile> flowFiles = new ArrayList<>();
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Lock readLock = rwLock.readLock();
+ private final Lock writeLock = rwLock.writeLock();
+ private final long creationNanos = System.nanoTime();
+
+ private FlowFile merged;
+ private RecordSetWriter recordWriter;
+ private ByteCountingOutputStream out;
+ private int recordCount = 0;
+ private volatile boolean complete = false;
+
+ private static final AtomicLong idGenerator = new AtomicLong(0L);
+ private final long id = idGenerator.getAndIncrement();
+
+
+ public RecordBin(final ProcessContext context, final ProcessSession session, final ComponentLog logger, final RecordBinThresholds thresholds) {
+ this.session = session;
+ this.writerFactory = context.getProperty(MergeRecord.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ this.logger = logger;
+ this.context = context;
+
+ this.merged = session.create();
+ this.thresholds = thresholds;
+ }
+
+ public boolean isOlderThan(final RecordBin other) {
+ return creationNanos < other.creationNanos;
+ }
+
+ public boolean isOlderThan(final long period, final TimeUnit unit) {
+ final long nanos = unit.toNanos(period);
+ return creationNanos < System.nanoTime() - nanos;
+ }
+
+ public boolean isComplete() {
+ return complete;
+ }
+
+ public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block)
+ throws IOException, MalformedRecordException, SchemaNotFoundException {
+
+ if (isComplete()) {
+ logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
+ return false;
+ }
+
+ final boolean locked;
+ if (block) {
+ writeLock.lock();
+ locked = true;
+ } else {
+ locked = writeLock.tryLock();
+ }
+
+ if (!locked) {
+ logger.debug("RecordBin.offer for id={} returning false because failed to get lock for {}", new Object[] {flowFile.getId(), this});
+ return false;
+ }
+
+ boolean flowFileMigrated = false;
+
+ try {
+ if (isComplete()) {
+ logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
+ return false;
+ }
+
+ logger.debug("Migrating id={} to {}", new Object[] {flowFile.getId(), this});
+
+ Record record;
+ while ((record = recordReader.nextRecord()) != null) {
+ if (recordWriter == null) {
+ final OutputStream rawOut = session.write(merged);
+ logger.debug("Created OutputStream using session {} for {}", new Object[] {session, this});
+
+ this.out = new ByteCountingOutputStream(rawOut);
+
+ recordWriter = writerFactory.createWriter(logger, record.getSchema(), flowFile, out);
+ recordWriter.beginRecordSet();
+ }
+
+ recordWriter.write(record);
+ recordCount++;
+ }
+
+ // This will be closed by the MergeRecord class anyway but we have to close it
+ // here because it needs to be closed before we are able to migrate the FlowFile
+ // to a new Session.
+ recordReader.close();
+ flowFileSession.migrate(this.session, Collections.singleton(flowFile));
+ flowFileMigrated = true;
+ this.flowFiles.add(flowFile);
+
+ if (isFull()) {
+ logger.debug(this + " is now full. Completing bin.");
+ complete("Bin is full");
+ } else if (isOlderThan(thresholds.getMaxBinMillis(), TimeUnit.MILLISECONDS)) {
+ logger.debug(this + " is now expired. Completing bin.");
+ complete("Bin is older than " + thresholds.getMaxBinAge());
+ }
+
+ return true;
+ } catch (final Exception e) {
+ logger.error("Failed to create merged FlowFile from " + (flowFiles.size() + 1) + " input FlowFiles; routing originals to failure", e);
+
+ try {
+ // This will be closed by the MergeRecord class anyway but we have to close it
+ // here because it needs to be closed before we are able to migrate the FlowFile
+ // to a new Session.
+ recordReader.close();
+
+ if (recordWriter != null) {
+ recordWriter.close();
+ }
+ if (this.out != null) {
+ this.out.close();
+ }
+
+ if (!flowFileMigrated) {
+ flowFileSession.migrate(this.session, Collections.singleton(flowFile));
+ this.flowFiles.add(flowFile);
+ }
+ } finally {
+ complete = true;
+ session.remove(merged);
+ session.transfer(flowFiles, MergeRecord.REL_FAILURE);
+ session.commit();
+ }
+
+ return true;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public boolean isFull() {
+ readLock.lock();
+ try {
+ if (!isFullEnough()) {
+ return false;
+ }
+
+ int maxRecords;
+ final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
+ if (recordCountAttribute.isPresent()) {
+ final Optional<String> recordCountValue = flowFiles.stream()
+ .filter(ff -> ff.getAttribute(recordCountAttribute.get()) != null)
+ .map(ff -> ff.getAttribute(recordCountAttribute.get()))
+ .findFirst();
+
+ if (!recordCountValue.isPresent()) {
+ return false;
+ }
+
+ try {
+ maxRecords = Integer.parseInt(recordCountValue.get());
+ } catch (final NumberFormatException e) {
+ maxRecords = 1;
+ }
+ } else {
+ maxRecords = thresholds.getMaxRecords();
+ }
+
+ if (recordCount >= maxRecords) {
+ return true;
+ }
+
+ if (out.getBytesWritten() >= thresholds.getMaxBytes()) {
+ return true;
+ }
+
+ return false;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public boolean isFullEnough() {
+ readLock.lock();
+ try {
+ if (flowFiles.isEmpty()) {
+ return false;
+ }
+
+ int requiredRecordCount;
+ final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
+ if (recordCountAttribute.isPresent()) {
+ final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
+ try {
+ requiredRecordCount = Integer.parseInt(recordCountValue);
+ } catch (final NumberFormatException e) {
+ requiredRecordCount = 1;
+ }
+ } else {
+ requiredRecordCount = thresholds.getMinRecords();
+ }
+
+ return (recordCount >= requiredRecordCount && out.getBytesWritten() >= thresholds.getMinBytes());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+
+ public void rollback() {
+ complete = true;
+ logger.debug("Marked {} as complete because rollback() was called", new Object[] {this});
+
+ writeLock.lock();
+ try {
+ if (recordWriter != null) {
+ try {
+ recordWriter.close();
+ } catch (IOException e) {
+ logger.warn("Failed to close Record Writer", e);
+ }
+ }
+
+ session.rollback();
+
+ if (logger.isDebugEnabled()) {
+ final List<String> ids = flowFiles.stream().map(ff -> " id=" + ff.getId() + ",").collect(Collectors.toList());
+ logger.debug("Rolled back bin {} containing input FlowFiles {}", new Object[] {this, ids});
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private long getBinAge() {
+ return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - creationNanos);
+ }
+
+ private void fail() {
+ complete = true;
+ logger.debug("Marked {} as complete because fail() was called", new Object[] {this});
+
+ writeLock.lock();
+ try {
+ if (recordWriter != null) {
+ try {
+ recordWriter.close();
+ } catch (IOException e) {
+ logger.warn("Failed to close Record Writer", e);
+ }
+ }
+
+ session.remove(merged);
+ session.transfer(flowFiles, MergeRecord.REL_FAILURE);
+ session.commit();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void complete(final String completionReason) throws IOException {
+ writeLock.lock();
+ try {
+ if (isComplete()) {
+ logger.debug("Cannot complete {} because it is already completed", new Object[] {this});
+ return;
+ }
+
+ complete = true;
+ logger.debug("Marked {} as complete because complete() was called", new Object[] {this});
+
+ final WriteResult writeResult = recordWriter.finishRecordSet();
+ recordWriter.close();
+ logger.debug("Closed Record Writer using session {} for {}", new Object[] {session, this});
+
+ if (flowFiles.isEmpty()) {
+ session.remove(merged);
+ return;
+ }
+
+ // If using defragment mode, and we don't have enough FlowFiles, then we need to fail this bin.
+ final Optional<String> countAttr = thresholds.getRecordCountAttribute();
+ if (countAttr.isPresent()) {
+ // Ensure that at least one FlowFile has a fragment.count attribute and that they all have the same value, if they have a value.
+ Integer expectedBinCount = null;
+ for (final FlowFile flowFile : flowFiles) {
+ final String countVal = flowFile.getAttribute(countAttr.get());
+ if (countVal == null) {
+ continue;
+ }
+
+ final int count;
+ try {
+ count = Integer.parseInt(countVal);
+ } catch (final NumberFormatException nfe) {
+ logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but expected a number",
+ new Object[] {flowFiles.size(), countAttr.get(), countVal, flowFile});
+ fail();
+ return;
+ }
+
+ if (expectedBinCount != null && count != expectedBinCount) {
+ logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but another FlowFile in the bin had a value of {}",
+ new Object[] {flowFiles.size(), countAttr.get(), countVal, flowFile, expectedBinCount});
+ fail();
+ return;
+ }
+
+ expectedBinCount = count;
+ }
+
+ if (expectedBinCount == null) {
+ logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute was not present on any of the FlowFiles",
+ new Object[] {flowFiles.size(), countAttr.get()});
+ fail();
+ return;
+ }
+
+ if (expectedBinCount != flowFiles.size()) {
+ logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' but only {} of {} FlowFiles were encountered before this bin was evicted "
+ + "(due to to Max Bin Age being reached or due to the Maximum Number of Bins being exceeded).",
+ new Object[] {flowFiles.size(), countAttr.get(), expectedBinCount, flowFiles.size(), expectedBinCount});
+ fail();
+ return;
+ }
+ }
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ final AttributeStrategy attributeStrategy = AttributeStrategyUtil.strategyFor(context);
+ final Map<String, String> mergedAttributes = attributeStrategy.getMergedAttributes(flowFiles);
+ attributes.putAll(mergedAttributes);
+
+ attributes.putAll(writeResult.getAttributes());
+ attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(), recordWriter.getMimeType());
+ attributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size()));
+ attributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge()));
+
+ merged = session.putAllAttributes(merged, attributes);
+
+ session.getProvenanceReporter().join(flowFiles, merged, "Records Merged due to: " + completionReason);
+ session.transfer(merged, MergeRecord.REL_MERGED);
+ session.transfer(flowFiles, MergeRecord.REL_ORIGINAL);
+ session.adjustCounter("Records Merged", writeResult.getRecordCount(), false);
+ session.commit();
+
+ if (logger.isDebugEnabled()) {
+ final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+ logger.debug("Completed bin {} with {} records with Merged FlowFile {} using input FlowFiles {}", new Object[] {this, writeResult.getRecordCount(), merged, ids});
+ }
+ } catch (final Exception e) {
+ session.rollback(true);
+ throw e;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ readLock.lock();
+ try {
+ return "RecordBin[size=" + flowFiles.size() + ", full=" + isFull() + ", isComplete=" + isComplete() + ", id=" + id + "]";
+ } finally {
+ readLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
new file mode 100644
index 0000000..8496a4d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.MergeContent;
+import org.apache.nifi.processors.standard.MergeRecord;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+
+public class RecordBinManager {
+
+ private final ProcessContext context;
+ private final ProcessSessionFactory sessionFactory;
+ private final ComponentLog logger;
+ private final int maxBinCount;
+
+ private final AtomicLong maxBinAgeNanos = new AtomicLong(Long.MAX_VALUE);
+ private final Map<String, List<RecordBin>> groupBinMap = new HashMap<>(); // guarded by lock
+ private final Lock lock = new ReentrantLock();
+
+ private final AtomicInteger binCount = new AtomicInteger(0);
+
+ public RecordBinManager(final ProcessContext context, final ProcessSessionFactory sessionFactory, final ComponentLog logger) {
+ this.context = context;
+ this.sessionFactory = sessionFactory;
+ this.logger = logger;
+
+ final Integer maxBins = context.getProperty(MergeRecord.MAX_BIN_COUNT).asInteger();
+ this.maxBinCount = maxBins == null ? Integer.MAX_VALUE : maxBins.intValue();
+ }
+
+ /**
+ * Must be called only when there are no active threads modifying the bins.
+ */
+ public void purge() {
+ lock.lock();
+ try {
+ for (final List<RecordBin> binList : groupBinMap.values()) {
+ for (final RecordBin bin : binList) {
+ bin.rollback();
+ }
+ }
+ groupBinMap.clear();
+ binCount.set(0);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public void setMaxBinAge(final Long timePeriod, final TimeUnit timeUnit) {
+ if (timePeriod == null) {
+ maxBinAgeNanos.set(Long.MAX_VALUE);
+ } else {
+ maxBinAgeNanos.set(timeUnit.toNanos(timePeriod));
+ }
+ }
+
+
+ public int getBinCount() {
+ return binCount.get();
+ }
+
+ /**
+ * Adds the given flowFiles to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary.
+ * <p/>
+ *
+ * @param groupIdentifier the group to which the flow file belongs; can be null
+ * @param flowFile flowFile to bin
+ * @param reader RecordReader to use for reading FlowFile
+ * @param session the ProcessSession to which the FlowFiles belong
+ * @param block if another thread is already writing to the desired bin, passing <code>true</code> for this parameter will block until the other thread(s) have finished so
+ * that the records can still be added to the desired bin. Passing <code>false</code> will result in moving on to another bin.
+ *
+ * @throws SchemaNotFoundException if unable to find the schema for the record writer
+ * @throws MalformedRecordException if unable to read a record
+ * @throws IOException if there is an IO problem reading from the stream or writing to the stream
+ */
+ public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block)
+ throws IOException, MalformedRecordException, SchemaNotFoundException {
+
+ final List<RecordBin> currentBins;
+ lock.lock();
+ try {
+ // Create a new List<RecordBin> if none exists for this Group ID. We use a CopyOnWriteArrayList here because
+ // we need to traverse the list in a couple of places and just below here, we call bin.offer() (which is very expensive)
+ // while traversing the List, so we don't want to do this within a synchronized block. If we end up seeing poor performance
+ // from this, we could look at instead using a Synchronized List and instead of calling bin.offer() while iterating allow for some
+ // sort of bin.tryLock() and have that lock only if the flowfile should be added. Then if it returns true, we can stop iterating
+ // and perform the expensive part and then ensure that we always unlock
+ currentBins = groupBinMap.computeIfAbsent(groupIdentifier, grpId -> new CopyOnWriteArrayList<>());
+ } finally {
+ lock.unlock();
+ }
+
+ RecordBin acceptedBin = null;
+ for (final RecordBin bin : currentBins) {
+ final boolean accepted = bin.offer(flowFile, reader, session, block);
+
+ if (accepted) {
+ acceptedBin = bin;
+ logger.debug("Transferred id={} to {}", new Object[] {flowFile.getId(), bin});
+ break;
+ }
+ }
+
+ // We have to do this outside of our for-loop above in order to avoid a concurrent modification Exception.
+ if (acceptedBin != null) {
+ if (acceptedBin.isComplete()) {
+ removeBins(groupIdentifier, Collections.singletonList(acceptedBin));
+ }
+
+ return;
+ }
+
+ // if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one
+ final RecordBin bin = new RecordBin(context, sessionFactory.createSession(), logger, createThresholds());
+ final boolean binAccepted = bin.offer(flowFile, reader, session, true);
+ if (!binAccepted) {
+ session.rollback();
+ throw new RuntimeException("Attempted to add " + flowFile + " to a new bin but failed. This is unexpected. Will roll back session and try again.");
+ }
+
+ logger.debug("Transferred id={} to {}", new Object[] {flowFile.getId(), bin});
+
+ if (!bin.isComplete()) {
+ final int updatedBinCount = binCount.incrementAndGet();
+
+ lock.lock();
+ try {
+ // We have already obtained the list of RecordBins from this Map above. However, we released
+ // the lock in order to avoid blocking while writing to a Bin. Because of this, it is possible
+ // that another thread may have already come in and removed this List from the Map, if all
+ // Bins in the List have been completed. As a result, we must now obtain the write lock again
+ // and obtain the List (or a new one), and then update that. This ensures that we never lose
+ // track of a Bin. If we don't lose this, we could completely lose a Bin.
+ final List<RecordBin> bins = groupBinMap.computeIfAbsent(groupIdentifier, grpId -> new CopyOnWriteArrayList<>());
+ bins.add(bin);
+ } finally {
+ lock.unlock();
+ }
+
+ if (updatedBinCount > maxBinCount) {
+ completeOldestBin();
+ }
+ }
+ }
+
+
+ private RecordBinThresholds createThresholds() {
+ final int minRecords = context.getProperty(MergeRecord.MIN_RECORDS).asInteger();
+ final int maxRecords = context.getProperty(MergeRecord.MAX_RECORDS).asInteger();
+ final long minBytes = context.getProperty(MergeRecord.MIN_SIZE).asDataSize(DataUnit.B).longValue();
+
+ final PropertyValue maxSizeValue = context.getProperty(MergeRecord.MAX_SIZE);
+ final long maxBytes = maxSizeValue.isSet() ? maxSizeValue.asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
+
+ final PropertyValue maxMillisValue = context.getProperty(MergeRecord.MAX_BIN_AGE);
+ final String maxBinAge = maxMillisValue.getValue();
+ final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS).longValue() : Long.MAX_VALUE;
+
+ final String recordCountAttribute;
+ final String mergeStrategy = context.getProperty(MergeRecord.MERGE_STRATEGY).getValue();
+ if (MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+ recordCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
+ } else {
+ recordCountAttribute = null;
+ }
+
+ return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, recordCountAttribute);
+ }
+
+
+ public void completeOldestBin() throws IOException {
+ RecordBin oldestBin = null;
+
+ lock.lock();
+ try {
+ String oldestBinGroup = null;
+
+ for (final Map.Entry<String, List<RecordBin>> group : groupBinMap.entrySet()) {
+ for (final RecordBin bin : group.getValue()) {
+ if (oldestBin == null || bin.isOlderThan(oldestBin)) {
+ oldestBin = bin;
+ oldestBinGroup = group.getKey();
+ }
+ }
+ }
+
+ if (oldestBin == null) {
+ return;
+ }
+
+ removeBins(oldestBinGroup, Collections.singletonList(oldestBin));
+ } finally {
+ lock.unlock();
+ }
+
+ logger.debug("Completing Bin " + oldestBin + " because the maximum number of bins has been exceeded");
+ oldestBin.complete("Maximum number of bins has been exceeded");
+ }
+
+
+ public void completeExpiredBins() throws IOException {
+ final long maxNanos = maxBinAgeNanos.get();
+ final Map<String, List<RecordBin>> expiredBinMap = new HashMap<>();
+
+ lock.lock();
+ try {
+ for (final Map.Entry<String, List<RecordBin>> entry : groupBinMap.entrySet()) {
+ final String key = entry.getKey();
+ final List<RecordBin> bins = entry.getValue();
+
+ for (final RecordBin bin : bins) {
+ if (bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS)) {
+ final List<RecordBin> expiredBinsForKey = expiredBinMap.computeIfAbsent(key, ignore -> new ArrayList<>());
+ expiredBinsForKey.add(bin);
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ for (final Map.Entry<String, List<RecordBin>> entry : expiredBinMap.entrySet()) {
+ final String key = entry.getKey();
+ final List<RecordBin> expiredBins = entry.getValue();
+
+ for (final RecordBin bin : expiredBins) {
+ logger.debug("Completing Bin {} because it has expired");
+ bin.complete("Bin has reached Max Bin Age");
+ }
+
+ removeBins(key, expiredBins);
+ }
+ }
+
+ private void removeBins(final String key, final List<RecordBin> bins) {
+ lock.lock();
+ try {
+ final List<RecordBin> list = groupBinMap.get(key);
+ if (list != null) {
+ final int initialSize = list.size();
+ list.removeAll(bins);
+
+ // Determine how many items were removed from the list and
+ // update our binCount to keep track of this.
+ final int removedCount = initialSize - list.size();
+ binCount.addAndGet(-removedCount);
+
+ if (list.isEmpty()) {
+ groupBinMap.remove(key);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}