You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/07/12 20:58:12 UTC

[2/2] nifi git commit: NIFI-4060: Initial implementation of MergeRecord

NIFI-4060: Initial implementation of MergeRecord

NIFI-4060: Addressed threading issue with RecordBin being updated after it is completed; fixed issue that caused mime.type attribute not to be written properly if all incoming flowfiles already have a different value for that attribute

NIFI-4060: Bug fixes; improved documentation; added a lot of debug information; updated StandardProcessSession to produce more accurate logs in case of a session being committed/rolled back with open input/output streams
Signed-off-by: Matt Burgess <ma...@apache.org>

This closes #1958


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b603cb95
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b603cb95
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b603cb95

Branch: refs/heads/master
Commit: b603cb955dcd1d3d9b5e374e5760f2f9b047bda9
Parents: eefad29
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jun 26 13:15:03 2017 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Wed Jul 12 16:36:48 2017 -0400

----------------------------------------------------------------------
 .../nifi/processor/util/bin/BinFiles.java       |  14 +-
 .../record/CommaSeparatedRecordReader.java      | 102 +++++
 .../repository/StandardProcessSession.java      |  19 +-
 .../nifi/processors/standard/MergeContent.java  | 104 +----
 .../nifi/processors/standard/MergeRecord.java   | 358 ++++++++++++++++
 .../standard/merge/AttributeStrategy.java       |  27 ++
 .../standard/merge/AttributeStrategyUtil.java   |  56 +++
 .../merge/KeepCommonAttributeStrategy.java      |  64 +++
 .../merge/KeepUniqueAttributeStrategy.java      |  58 +++
 .../processors/standard/merge/RecordBin.java    | 424 +++++++++++++++++++
 .../standard/merge/RecordBinManager.java        | 295 +++++++++++++
 .../standard/merge/RecordBinThresholds.java     |  69 +++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      | 229 ++++++++++
 .../processors/standard/TestMergeContent.java   |   9 +-
 .../processors/standard/TestMergeRecord.java    | 360 ++++++++++++++++
 16 files changed, 2076 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index b15d23b..7f79b70 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -131,9 +131,10 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
      *
      * @param context context
      * @param flowFile flowFile
+     * @param session the session for accessing the FlowFile
      * @return The appropriate group ID
      */
-    protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile);
+    protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session);
 
     /**
      * Performs any additional setup of the bin manager. Called during the OnScheduled phase.
@@ -271,8 +272,15 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
             final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>();
             for (FlowFile flowFile : flowFiles) {
                 flowFile = this.preprocessFlowFile(context, session, flowFile);
-                final String groupingIdentifier = getGroupId(context, flowFile);
-                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
+
+                try {
+                    final String groupingIdentifier = getGroupId(context, flowFile, session);
+                    flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
+                } catch (final Exception e) {
+                    getLogger().error("Could not determine which Bin to add {} to; will route to failure", new Object[] {flowFile}, e);
+                    session.transfer(flowFile, REL_FAILURE);
+                    continue;
+                }
             }
 
             for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
new file mode 100644
index 0000000..8973055
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+
+public class CommaSeparatedRecordReader extends AbstractControllerService implements RecordReaderFactory {
+    private int failAfterN;
+    private int recordCount = 0;
+
+    public CommaSeparatedRecordReader() {
+        this(-1);
+    }
+
+    public CommaSeparatedRecordReader(final int failAfterN) {
+        this.failAfterN = failAfterN;
+    }
+
+    public void failAfter(final int failAfterN) {
+        this.failAfterN = failAfterN;
+    }
+
+    @Override
+    public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
+        final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+
+        final List<RecordField> fields = new ArrayList<>();
+
+        final String headerLine = reader.readLine();
+        for (final String colName : headerLine.split(",")) {
+            fields.add(new RecordField(colName.trim(), RecordFieldType.STRING.getDataType()));
+        }
+
+        return new RecordReader() {
+
+            @Override
+            public void close() throws IOException {
+                reader.close();
+            }
+
+            @Override
+            public Record nextRecord() throws IOException, MalformedRecordException {
+                if (failAfterN > -1 && recordCount >= failAfterN) {
+                    throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
+                }
+
+                final String nextLine = reader.readLine();
+                if (nextLine == null) {
+                    return null;
+                }
+
+                recordCount++;
+
+                final String[] values = nextLine.split(",");
+                final Map<String, Object> valueMap = new HashMap<>();
+                int i = 0;
+                for (final RecordField field : fields) {
+                    final String fieldName = field.getFieldName();
+                    valueMap.put(fieldName, values[i++].trim());
+                }
+
+                return new MapRecord(new SimpleRecordSchema(fields), valueMap);
+            }
+
+            @Override
+            public RecordSchema getSchema() {
+                return new SimpleRecordSchema(fields);
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index d34d8cf..d2a6af6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -191,13 +191,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         processingStartTime = System.nanoTime();
     }
 
-    private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap) {
+    private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap, final String action, final String streamType) {
         final Map<FlowFile, ? extends Closeable> openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List
         for (final Map.Entry<FlowFile, ? extends Closeable> entry : openStreamCopy.entrySet()) {
             final FlowFile flowFile = entry.getKey();
             final Closeable openStream = entry.getValue();
 
-            LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", this, openStream, flowFile);
+            LOG.warn("{} closing {} for {} because the session was {} without the {} stream being closed.", this, openStream, flowFile, action, streamType);
 
             try {
                 openStream.close();
@@ -212,8 +212,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         resetWriteClaims(false);
 
-        closeStreams(openInputStreams);
-        closeStreams(openOutputStreams);
+        closeStreams(openInputStreams, "committed", "input");
+        closeStreams(openOutputStreams, "committed", "output");
 
         if (!readRecursionSet.isEmpty()) {
             throw new IllegalStateException();
@@ -914,8 +914,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         deleteOnCommit.clear();
 
-        closeStreams(openInputStreams);
-        closeStreams(openOutputStreams);
+        closeStreams(openInputStreams, "rolled back", "input");
+        closeStreams(openOutputStreams, "rolled back", "output");
 
         try {
             claimCache.reset();
@@ -2171,7 +2171,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e);
         }
 
-        final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), false);
+        final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
         final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
         final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn);
         final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
@@ -2470,7 +2470,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     final long bytesWritten = countingOut.getBytesWritten();
                     StandardProcessSession.this.bytesWritten += bytesWritten;
 
-                    openOutputStreams.remove(sourceFlowFile);
+                    final OutputStream removed = openOutputStreams.remove(sourceFlowFile);
+                    if (removed == null) {
+                        LOG.error("Closed Session's OutputStream but there was no entry for it in the map; sourceFlowFile={}; map={}", sourceFlowFile, openOutputStreams);
+                    }
 
                     flush();
                     removeTemporaryClaim(record);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 3401d66..edbc033 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -82,6 +82,8 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.bin.Bin;
 import org.apache.nifi.processor.util.bin.BinFiles;
 import org.apache.nifi.processor.util.bin.BinManager;
+import org.apache.nifi.processors.standard.merge.AttributeStrategy;
+import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
 import org.apache.nifi.stream.io.NonCloseableOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FlowFilePackager;
@@ -126,7 +128,7 @@ import org.apache.nifi.util.FlowFilePackagerV3;
     @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
     @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
         + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") })
-@SeeAlso(SegmentContent.class)
+@SeeAlso({SegmentContent.class, MergeRecord.class})
 public class MergeContent extends BinFiles {
 
     // preferred attributes
@@ -201,8 +203,6 @@ public class MergeContent extends BinFiles {
             MERGE_FORMAT_AVRO_VALUE,
             "The Avro contents of all FlowFiles will be concatenated together into a single FlowFile");
 
-    public static final String ATTRIBUTE_STRATEGY_ALL_COMMON = "Keep Only Common Attributes";
-    public static final String ATTRIBUTE_STRATEGY_ALL_UNIQUE = "Keep All Unique Attributes";
 
     public static final String TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions";
     public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
@@ -224,16 +224,6 @@ public class MergeContent extends BinFiles {
             .allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO)
             .defaultValue(MERGE_FORMAT_CONCAT.getValue())
             .build();
-    public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder()
-            .required(true)
-            .name("Attribute Strategy")
-            .description("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any "
-                    + "attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. "
-                    + "If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same "
-                    + "value, will be preserved.")
-            .allowableValues(ATTRIBUTE_STRATEGY_ALL_COMMON, ATTRIBUTE_STRATEGY_ALL_UNIQUE)
-            .defaultValue(ATTRIBUTE_STRATEGY_ALL_COMMON)
-            .build();
 
     public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
             .name("Correlation Attribute Name")
@@ -315,7 +305,7 @@ public class MergeContent extends BinFiles {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(MERGE_STRATEGY);
         descriptors.add(MERGE_FORMAT);
-        descriptors.add(ATTRIBUTE_STRATEGY);
+        descriptors.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY);
         descriptors.add(CORRELATION_ATTRIBUTE_NAME);
         descriptors.add(MIN_ENTRIES);
         descriptors.add(MAX_ENTRIES);
@@ -378,7 +368,7 @@ public class MergeContent extends BinFiles {
     }
 
     @Override
-    protected String getGroupId(final ProcessContext context, final FlowFile flowFile) {
+    protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session) {
         final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME)
                 .evaluateAttributeExpressions(flowFile).getValue();
         String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName);
@@ -429,16 +419,7 @@ public class MergeContent extends BinFiles {
                 throw new AssertionError();
         }
 
-        final AttributeStrategy attributeStrategy;
-        switch (context.getProperty(ATTRIBUTE_STRATEGY).getValue()) {
-            case ATTRIBUTE_STRATEGY_ALL_UNIQUE:
-                attributeStrategy = new KeepUniqueAttributeStrategy();
-                break;
-            case ATTRIBUTE_STRATEGY_ALL_COMMON:
-            default:
-                attributeStrategy = new KeepCommonAttributeStrategy();
-                break;
-        }
+        final AttributeStrategy attributeStrategy = AttributeStrategyUtil.strategyFor(context);
 
         final List<FlowFile> contents = bin.getContents();
         final ProcessSession binSession = bin.getSession();
@@ -989,76 +970,7 @@ public class MergeContent extends BinFiles {
         }
     }
 
-    private static class KeepUniqueAttributeStrategy implements AttributeStrategy {
-
-        @Override
-        public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
-            final Map<String, String> newAttributes = new HashMap<>();
-            final Set<String> conflicting = new HashSet<>();
-
-            for (final FlowFile flowFile : flowFiles) {
-                for (final Map.Entry<String, String> attributeEntry : flowFile.getAttributes().entrySet()) {
-                    final String name = attributeEntry.getKey();
-                    final String value = attributeEntry.getValue();
-
-                    final String existingValue = newAttributes.get(name);
-                    if (existingValue != null && !existingValue.equals(value)) {
-                        conflicting.add(name);
-                    } else {
-                        newAttributes.put(name, value);
-                    }
-                }
-            }
-
-            for (final String attributeToRemove : conflicting) {
-                newAttributes.remove(attributeToRemove);
-            }
-
-            // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
-            newAttributes.remove(CoreAttributes.UUID.key());
-            return newAttributes;
-        }
-    }
-
-    private static class KeepCommonAttributeStrategy implements AttributeStrategy {
-
-        @Override
-        public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
-            final Map<String, String> result = new HashMap<>();
-
-            //trivial cases
-            if (flowFiles == null || flowFiles.isEmpty()) {
-                return result;
-            } else if (flowFiles.size() == 1) {
-                result.putAll(flowFiles.iterator().next().getAttributes());
-            }
-
-            /*
-             * Start with the first attribute map and only put an entry to the
-             * resultant map if it is common to every map.
-             */
-            final Map<String, String> firstMap = flowFiles.iterator().next().getAttributes();
-
-            outer:
-            for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
-                final String key = mapEntry.getKey();
-                final String value = mapEntry.getValue();
-
-                for (final FlowFile flowFile : flowFiles) {
-                    final Map<String, String> currMap = flowFile.getAttributes();
-                    final String curVal = currMap.get(key);
-                    if (curVal == null || !curVal.equals(value)) {
-                        continue outer;
-                    }
-                }
-                result.put(key, value);
-            }
 
-            // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
-            result.remove(CoreAttributes.UUID.key());
-            return result;
-        }
-    }
 
     private static class FragmentComparator implements Comparator<FlowFile> {
 
@@ -1079,8 +991,4 @@ public class MergeContent extends BinFiles {
         List<FlowFile> getUnmergedFlowFiles();
     }
 
-    private interface AttributeStrategy {
-
-        Map<String, String> getMergedAttributes(List<FlowFile> flowFiles);
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
new file mode 100644
index 0000000..b0e3f48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.FlowFileFilters;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
+import org.apache.nifi.processors.standard.merge.RecordBinManager;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@SideEffectFree
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"merge", "record", "content", "correlation", "stream", "event"})
+@CapabilityDescription("This Processor merges together multiple record-oriented FlowFiles into a single FlowFile that contains all of the Records of the input FlowFiles. "
+    + "This Processor works by creating 'bins' and then adding FlowFiles to these bins until they are full. Once a bin is full, all of the FlowFiles will be combined into "
+    + "a single output FlowFile, and that FlowFile will be routed to the 'merged' Relationship. A bin will consist of potentially many 'like FlowFiles'. In order for two "
+    + "FlowFiles to be considered 'like FlowFiles', they must have the same Schema (as identified by the Record Reader) and, if the <Correlation Attribute Name> property "
+    + "is set, the same value for the specified attribute. See Processor Usage and Additional Details for more information.")
+@ReadsAttributes({
+    @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+        + "All FlowFiles with the same value for this attribute will be bundled together."),
+    @ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This "
+        + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+        + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected "
+        + "in the given bundle."),
+})
+@WritesAttributes({
+    @WritesAttribute(attribute = "record.count", description = "The merged FlowFile will have a 'record.count' attribute indicating the number of records "
+        + "that were written to the FlowFile."),
+    @WritesAttribute(attribute = "mime.type", description = "The MIME Type indicated by the Record Writer"),
+    @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
+    @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
+        + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"),
+    @WritesAttribute(attribute = "<Attributes from Record Writer>", description = "Any Attribute that the configured Record Writer returns will be added to the FlowFile.")
+})
+@SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class})
+public class MergeRecord extends AbstractSessionFactoryProcessor {
+    // attributes for defragmentation
+    public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
+
+    public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
+    public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
+
+    public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
+        "Bin-Packing Algorithm",
+        "Bin-Packing Algorithm",
+        "Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally "
+            + "their attributes (if the <Correlation Attribute> property is set)");
+    public static final AllowableValue MERGE_STRATEGY_DEFRAGMENT = new AllowableValue(
+        "Defragment",
+        "Defragment",
+        "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must "
+            + "have the attributes <fragment.identifier> and <fragment.count>. All FlowFiles with the same value for \"fragment.identifier\" "
+            + "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. The ordering of "
+            + "the Records that are output is not guaranteed.");
+
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("Specifies the Controller Service to use for reading incoming data")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Controller Service to use for writing out the records")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder()
+        .name("merge-strategy")
+        .displayName("Merge Strategy")
+        .description("Specifies the algorithm used to merge records. The 'Defragment' algorithm combines fragments that are associated by "
+            + "attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily "
+            + "chosen FlowFiles")
+        .required(true)
+        .allowableValues(MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT)
+        .defaultValue(MERGE_STRATEGY_BIN_PACK.getValue())
+        .build();
+    public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+        .name("correlation-attribute-name")
+        .displayName("Correlation Attribute Name")
+        .description("If specified, two FlowFiles will be binned together only if they have the same value for "
+            + "this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.")
+        .required(false)
+        .expressionLanguageSupported(false)
+        .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+        .defaultValue(null)
+        .build();
+    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
+        .name("min-bin-size")
+        .displayName("Minimum Bin Size")
+        .description("The minimum size of for the bin")
+        .required(true)
+        .defaultValue("0 B")
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .build();
+    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
+        .name("max-bin-size")
+        .displayName("Maximum Bin Size")
+        .description("The maximum size for the bundle. If not specified, there is no maximum. This is a 'soft limit' in that if a FlowFile is added to a bin, "
+            + "all records in that FlowFile will be added, so this limit may be exceeded by up to the number of bytes in last input FlowFile.")
+        .required(false)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor MIN_RECORDS = new PropertyDescriptor.Builder()
+        .name("min-records")
+        .displayName("Minimum Number of Records")
+        .description("The minimum number of records to include in a bin")
+        .required(true)
+        .defaultValue("1")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+    public static final PropertyDescriptor MAX_RECORDS = new PropertyDescriptor.Builder()
+        .name("max-records")
+        .displayName("Maximum Number of Records")
+        .description("The maximum number of Records to include in a bin. This is a 'soft limit' in that if a FlowFIle is added to a bin, all records in that FlowFile will be added, "
+            + "so this limit may be exceeded by up to the number of records in the last input FlowFile.")
+        .required(false)
+        .defaultValue("1000")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+    public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
+        .name("max.bin.count")
+        .displayName("Maximum Number of Bins")
+        .description("Specifies the maximum number of bins that can be held in memory at any one time. "
+            + "This number should not be smaller than the maximum number of conurrent threads for this Processor, "
+            + "or the bins that are created will often consist only of a single incoming FlowFile.")
+        .defaultValue("10")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
+        .name("max-bin-age")
+        .displayName("Max Bin Age")
+        .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> "
+            + "where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
+        .required(false)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .build();
+
+
+
+    public static final Relationship REL_MERGED = new Relationship.Builder()
+        .name("merged")
+        .description("The FlowFile containing the merged records")
+        .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("The FlowFiles that were used to create the bundle")
+        .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure")
+        .build();
+
+    private final AtomicReference<RecordBinManager> binManager = new AtomicReference<>();
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
+        properties.add(MERGE_STRATEGY);
+        properties.add(CORRELATION_ATTRIBUTE_NAME);
+        properties.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY);
+        properties.add(MIN_RECORDS);
+        properties.add(MAX_RECORDS);
+        properties.add(MIN_SIZE);
+        properties.add(MAX_SIZE);
+        properties.add(MAX_BIN_AGE);
+        properties.add(MAX_BIN_COUNT);
+        return properties;
+    }
+
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_MERGED);
+        return relationships;
+    }
+
+
+    @OnStopped
+    public final void resetState() {
+        final RecordBinManager manager = binManager.get();
+        if (manager != null) {
+            manager.purge();
+        }
+        binManager.set(null);
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+        RecordBinManager manager = binManager.get();
+        while (manager == null) {
+            manager = new RecordBinManager(context, sessionFactory, getLogger());
+            manager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+            final boolean updated = binManager.compareAndSet(null, manager);
+            if (!updated) {
+                manager = binManager.get();
+            }
+        }
+
+        final ProcessSession session = sessionFactory.createSession();
+        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+        if (getLogger().isDebugEnabled()) {
+            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
+        }
+
+        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+        final boolean block;
+        if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
+            block = true;
+        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+            block = true;
+        } else {
+            block = false;
+        }
+
+        try {
+            for (final FlowFile flowFile : flowFiles) {
+                try {
+                    binFlowFile(context, flowFile, session, manager, block);
+                } catch (final Exception e) {
+                    getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+            }
+        } finally {
+            session.commit();
+        }
+
+        try {
+            manager.completeExpiredBins();
+        } catch (final Exception e) {
+            getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+        }
+
+        if (flowFiles.isEmpty()) {
+            getLogger().debug("No FlowFiles to bin; will yield");
+            context.yield();
+        }
+    }
+
+
+    private void binFlowFile(final ProcessContext context, final FlowFile flowFile, final ProcessSession session, final RecordBinManager binManager, final boolean block) {
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        try (final InputStream in = session.read(flowFile);
+            final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            final RecordSchema schema = reader.getSchema();
+
+            final String groupId = getGroupId(context, flowFile, schema, session);
+            getLogger().debug("Got Group ID {} for {}", new Object[] {groupId, flowFile});
+
+            binManager.add(groupId, flowFile, reader, session, block);
+        } catch (MalformedRecordException | IOException | SchemaNotFoundException e) {
+            throw new ProcessException(e);
+        }
+    }
+
+
+    protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final RecordSchema schema, final ProcessSession session) {
+        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+        if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
+            return flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
+        }
+
+        final Optional<String> optionalText = schema.getSchemaText();
+        final String schemaText = optionalText.isPresent() ? optionalText.get() : AvroTypeUtil.extractAvroSchema(schema).toString();
+
+        final String groupId;
+        final String correlationshipAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
+        if (correlationshipAttributeName != null) {
+            final String correlationAttr = flowFile.getAttribute(correlationshipAttributeName);
+            groupId = correlationAttr == null ? schemaText : schemaText + correlationAttr;
+        } else {
+            groupId = schemaText;
+        }
+
+        return groupId;
+    }
+
+    int getBinCount() {
+        return binManager.get().getBinCount();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java
new file mode 100644
index 0000000..e3c9da9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public interface AttributeStrategy {
+    Map<String, String> getMergedAttributes(List<FlowFile> flowFiles);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java
new file mode 100644
index 0000000..221eb04
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+
+public class AttributeStrategyUtil {
+
+    public static final AllowableValue ATTRIBUTE_STRATEGY_ALL_COMMON = new AllowableValue("Keep Only Common Attributes", "Keep Only Common Attributes",
+        "Any attribute that is not the same on all FlowFiles in a bin will be dropped. Those that are the same across all FlowFiles will be retained.");
+    public static final AllowableValue ATTRIBUTE_STRATEGY_ALL_UNIQUE = new AllowableValue("Keep All Unique Attributes", "Keep All Unique Attributes",
+        "Any attribute that has the same value for all FlowFiles in a bin, or has no value for a FlowFile, will be kept. For example, if a bin consists of 3 FlowFiles "
+            + "and 2 of them have a value of 'hello' for the 'greeting' attribute and the third FlowFile has no 'greeting' attribute then the outbound FlowFile will get "
+            + "a 'greeting' attribute with the value 'hello'.");
+
+    public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder()
+        .required(true)
+        .name("Attribute Strategy")
+        .description("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any "
+            + "attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. "
+            + "If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same "
+            + "value, will be preserved.")
+        .allowableValues(ATTRIBUTE_STRATEGY_ALL_COMMON, ATTRIBUTE_STRATEGY_ALL_UNIQUE)
+        .defaultValue(ATTRIBUTE_STRATEGY_ALL_COMMON.getValue())
+        .build();
+
+
+    public static AttributeStrategy strategyFor(ProcessContext context) {
+        final String strategyName = context.getProperty(ATTRIBUTE_STRATEGY).getValue();
+        if (ATTRIBUTE_STRATEGY_ALL_UNIQUE.getValue().equals(strategyName)) {
+            return new KeepUniqueAttributeStrategy();
+        }
+        if (ATTRIBUTE_STRATEGY_ALL_COMMON.getValue().equals(strategyName)) {
+            return new KeepCommonAttributeStrategy();
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java
new file mode 100644
index 0000000..5e7920a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+
+public class KeepCommonAttributeStrategy implements AttributeStrategy {
+
+    @Override
+    public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
+        final Map<String, String> result = new HashMap<>();
+
+        //trivial cases
+        if (flowFiles == null || flowFiles.isEmpty()) {
+            return result;
+        } else if (flowFiles.size() == 1) {
+            result.putAll(flowFiles.iterator().next().getAttributes());
+        }
+
+        /*
+         * Start with the first attribute map and only put an entry to the
+         * resultant map if it is common to every map.
+         */
+        final Map<String, String> firstMap = flowFiles.iterator().next().getAttributes();
+
+        outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+            final String key = mapEntry.getKey();
+            final String value = mapEntry.getValue();
+
+            for (final FlowFile flowFile : flowFiles) {
+                final Map<String, String> currMap = flowFile.getAttributes();
+                final String curVal = currMap.get(key);
+                if (curVal == null || !curVal.equals(value)) {
+                    continue outer;
+                }
+            }
+            result.put(key, value);
+        }
+
+        // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
+        result.remove(CoreAttributes.UUID.key());
+        return result;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java
new file mode 100644
index 0000000..86fb198
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+
+public class KeepUniqueAttributeStrategy implements AttributeStrategy {
+
+    @Override
+    public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
+        final Map<String, String> newAttributes = new HashMap<>();
+        final Set<String> conflicting = new HashSet<>();
+
+        for (final FlowFile flowFile : flowFiles) {
+            for (final Map.Entry<String, String> attributeEntry : flowFile.getAttributes().entrySet()) {
+                final String name = attributeEntry.getKey();
+                final String value = attributeEntry.getValue();
+
+                final String existingValue = newAttributes.get(name);
+                if (existingValue != null && !existingValue.equals(value)) {
+                    conflicting.add(name);
+                } else {
+                    newAttributes.put(name, value);
+                }
+            }
+        }
+
+        for (final String attributeToRemove : conflicting) {
+            newAttributes.remove(attributeToRemove);
+        }
+
+        // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
+        newAttributes.remove(CoreAttributes.UUID.key());
+        return newAttributes;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
new file mode 100644
index 0000000..0b4cf31
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.standard.MergeRecord;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+
+public class RecordBin {
+    public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
+    public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
+
+    private final ComponentLog logger;
+    private final ProcessSession session;
+    private final RecordSetWriterFactory writerFactory;
+    private final RecordBinThresholds thresholds;
+    private final ProcessContext context;
+
+    private final List<FlowFile> flowFiles = new ArrayList<>();
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock();
+    private final Lock writeLock = rwLock.writeLock();
+    private final long creationNanos = System.nanoTime();
+
+    private FlowFile merged;
+    private RecordSetWriter recordWriter;
+    private ByteCountingOutputStream out;
+    private int recordCount = 0;
+    private volatile boolean complete = false;
+
+    private static final AtomicLong idGenerator = new AtomicLong(0L);
+    private final long id = idGenerator.getAndIncrement();
+
+
+    public RecordBin(final ProcessContext context, final ProcessSession session, final ComponentLog logger, final RecordBinThresholds thresholds) {
+        this.session = session;
+        this.writerFactory = context.getProperty(MergeRecord.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        this.logger = logger;
+        this.context = context;
+
+        this.merged = session.create();
+        this.thresholds = thresholds;
+    }
+
+    public boolean isOlderThan(final RecordBin other) {
+        return creationNanos < other.creationNanos;
+    }
+
+    public boolean isOlderThan(final long period, final TimeUnit unit) {
+        final long nanos = unit.toNanos(period);
+        return creationNanos < System.nanoTime() - nanos;
+    }
+
+    public boolean isComplete() {
+        return complete;
+    }
+
+    public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block)
+        throws IOException, MalformedRecordException, SchemaNotFoundException {
+
+        if (isComplete()) {
+            logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
+            return false;
+        }
+
+        final boolean locked;
+        if (block) {
+            writeLock.lock();
+            locked = true;
+        } else {
+            locked = writeLock.tryLock();
+        }
+
+        if (!locked) {
+            logger.debug("RecordBin.offer for id={} returning false because failed to get lock for {}", new Object[] {flowFile.getId(), this});
+            return false;
+        }
+
+        boolean flowFileMigrated = false;
+
+        try {
+            if (isComplete()) {
+                logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
+                return false;
+            }
+
+            logger.debug("Migrating id={} to {}", new Object[] {flowFile.getId(), this});
+
+            Record record;
+            while ((record = recordReader.nextRecord()) != null) {
+                if (recordWriter == null) {
+                    final OutputStream rawOut = session.write(merged);
+                    logger.debug("Created OutputStream using session {} for {}", new Object[] {session, this});
+
+                    this.out = new ByteCountingOutputStream(rawOut);
+
+                    recordWriter = writerFactory.createWriter(logger, record.getSchema(), flowFile, out);
+                    recordWriter.beginRecordSet();
+                }
+
+                recordWriter.write(record);
+                recordCount++;
+            }
+
+            // This will be closed by the MergeRecord class anyway but we have to close it
+            // here because it needs to be closed before we are able to migrate the FlowFile
+            // to a new Session.
+            recordReader.close();
+            flowFileSession.migrate(this.session, Collections.singleton(flowFile));
+            flowFileMigrated = true;
+            this.flowFiles.add(flowFile);
+
+            if (isFull()) {
+                logger.debug(this + " is now full. Completing bin.");
+                complete("Bin is full");
+            } else if (isOlderThan(thresholds.getMaxBinMillis(), TimeUnit.MILLISECONDS)) {
+                logger.debug(this + " is now expired. Completing bin.");
+                complete("Bin is older than " + thresholds.getMaxBinAge());
+            }
+
+            return true;
+        } catch (final Exception e) {
+            logger.error("Failed to create merged FlowFile from " + (flowFiles.size() + 1) + " input FlowFiles; routing originals to failure", e);
+
+            try {
+                // This will be closed by the MergeRecord class anyway but we have to close it
+                // here because it needs to be closed before we are able to migrate the FlowFile
+                // to a new Session.
+                recordReader.close();
+
+                if (recordWriter != null) {
+                    recordWriter.close();
+                }
+                if (this.out != null) {
+                    this.out.close();
+                }
+
+                if (!flowFileMigrated) {
+                    flowFileSession.migrate(this.session, Collections.singleton(flowFile));
+                    this.flowFiles.add(flowFile);
+                }
+            } finally {
+                complete = true;
+                session.remove(merged);
+                session.transfer(flowFiles, MergeRecord.REL_FAILURE);
+                session.commit();
+            }
+
+            return true;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public boolean isFull() {
+        readLock.lock();
+        try {
+            if (!isFullEnough()) {
+                return false;
+            }
+
+            int maxRecords;
+            final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
+            if (recordCountAttribute.isPresent()) {
+                final Optional<String> recordCountValue = flowFiles.stream()
+                    .filter(ff -> ff.getAttribute(recordCountAttribute.get()) != null)
+                    .map(ff -> ff.getAttribute(recordCountAttribute.get()))
+                    .findFirst();
+
+                if (!recordCountValue.isPresent()) {
+                    return false;
+                }
+
+                try {
+                    maxRecords = Integer.parseInt(recordCountValue.get());
+                } catch (final NumberFormatException e) {
+                    maxRecords = 1;
+                }
+            } else {
+                maxRecords = thresholds.getMaxRecords();
+            }
+
+            if (recordCount >= maxRecords) {
+                return true;
+            }
+
+            if (out.getBytesWritten() >= thresholds.getMaxBytes()) {
+                return true;
+            }
+
+            return false;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public boolean isFullEnough() {
+        readLock.lock();
+        try {
+            if (flowFiles.isEmpty()) {
+                return false;
+            }
+
+            int requiredRecordCount;
+            final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
+            if (recordCountAttribute.isPresent()) {
+                final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
+                try {
+                    requiredRecordCount = Integer.parseInt(recordCountValue);
+                } catch (final NumberFormatException e) {
+                    requiredRecordCount = 1;
+                }
+            } else {
+                requiredRecordCount = thresholds.getMinRecords();
+            }
+
+            return (recordCount >= requiredRecordCount && out.getBytesWritten() >= thresholds.getMinBytes());
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+
+    public void rollback() {
+        complete = true;
+        logger.debug("Marked {} as complete because rollback() was called", new Object[] {this});
+
+        writeLock.lock();
+        try {
+            if (recordWriter != null) {
+                try {
+                    recordWriter.close();
+                } catch (IOException e) {
+                    logger.warn("Failed to close Record Writer", e);
+                }
+            }
+
+            session.rollback();
+
+            if (logger.isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> " id=" + ff.getId() + ",").collect(Collectors.toList());
+                logger.debug("Rolled back bin {} containing input FlowFiles {}", new Object[] {this, ids});
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    private long getBinAge() {
+        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - creationNanos);
+    }
+
+    private void fail() {
+        complete = true;
+        logger.debug("Marked {} as complete because fail() was called", new Object[] {this});
+
+        writeLock.lock();
+        try {
+            if (recordWriter != null) {
+                try {
+                    recordWriter.close();
+                } catch (IOException e) {
+                    logger.warn("Failed to close Record Writer", e);
+                }
+            }
+
+            session.remove(merged);
+            session.transfer(flowFiles, MergeRecord.REL_FAILURE);
+            session.commit();
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public void complete(final String completionReason) throws IOException {
+        writeLock.lock();
+        try {
+            if (isComplete()) {
+                logger.debug("Cannot complete {} because it is already completed", new Object[] {this});
+                return;
+            }
+
+            complete = true;
+            logger.debug("Marked {} as complete because complete() was called", new Object[] {this});
+
+            final WriteResult writeResult = recordWriter.finishRecordSet();
+            recordWriter.close();
+            logger.debug("Closed Record Writer using session {} for {}", new Object[] {session, this});
+
+            if (flowFiles.isEmpty()) {
+                session.remove(merged);
+                return;
+            }
+
+            // If using defragment mode, and we don't have enough FlowFiles, then we need to fail this bin.
+            final Optional<String> countAttr = thresholds.getRecordCountAttribute();
+            if (countAttr.isPresent()) {
+                // Ensure that at least one FlowFile has a fragment.count attribute and that they all have the same value, if they have a value.
+                Integer expectedBinCount = null;
+                for (final FlowFile flowFile : flowFiles) {
+                    final String countVal = flowFile.getAttribute(countAttr.get());
+                    if (countVal == null) {
+                        continue;
+                    }
+
+                    final int count;
+                    try {
+                        count = Integer.parseInt(countVal);
+                    } catch (final NumberFormatException nfe) {
+                        logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but expected a number",
+                            new Object[] {flowFiles.size(), countAttr.get(), countVal, flowFile});
+                        fail();
+                        return;
+                    }
+
+                    if (expectedBinCount != null && count != expectedBinCount) {
+                        logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but another FlowFile in the bin had a value of {}",
+                            new Object[] {flowFiles.size(), countAttr.get(), countVal, flowFile, expectedBinCount});
+                        fail();
+                        return;
+                    }
+
+                    expectedBinCount = count;
+                }
+
+                if (expectedBinCount == null) {
+                    logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute was not present on any of the FlowFiles",
+                        new Object[] {flowFiles.size(), countAttr.get()});
+                    fail();
+                    return;
+                }
+
+                if (expectedBinCount != flowFiles.size()) {
+                    logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' but only {} of {} FlowFiles were encountered before this bin was evicted "
+                        + "(due to to Max Bin Age being reached or due to the Maximum Number of Bins being exceeded).",
+                        new Object[] {flowFiles.size(), countAttr.get(), expectedBinCount, flowFiles.size(), expectedBinCount});
+                    fail();
+                    return;
+                }
+            }
+
+            final Map<String, String> attributes = new HashMap<>();
+
+            final AttributeStrategy attributeStrategy = AttributeStrategyUtil.strategyFor(context);
+            final Map<String, String> mergedAttributes = attributeStrategy.getMergedAttributes(flowFiles);
+            attributes.putAll(mergedAttributes);
+
+            attributes.putAll(writeResult.getAttributes());
+            attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+            attributes.put(CoreAttributes.MIME_TYPE.key(), recordWriter.getMimeType());
+            attributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size()));
+            attributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge()));
+
+            merged = session.putAllAttributes(merged, attributes);
+
+            session.getProvenanceReporter().join(flowFiles, merged, "Records Merged due to: " + completionReason);
+            session.transfer(merged, MergeRecord.REL_MERGED);
+            session.transfer(flowFiles, MergeRecord.REL_ORIGINAL);
+            session.adjustCounter("Records Merged", writeResult.getRecordCount(), false);
+            session.commit();
+
+            if (logger.isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                logger.debug("Completed bin {} with {} records with Merged FlowFile {} using input FlowFiles {}", new Object[] {this, writeResult.getRecordCount(), merged, ids});
+            }
+        } catch (final Exception e) {
+            session.rollback(true);
+            throw e;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public String toString() {
+        readLock.lock();
+        try {
+            return "RecordBin[size=" + flowFiles.size() + ", full=" + isFull() + ", isComplete=" + isComplete() + ", id=" + id + "]";
+        } finally {
+            readLock.unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
new file mode 100644
index 0000000..8496a4d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.MergeContent;
+import org.apache.nifi.processors.standard.MergeRecord;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+
+public class RecordBinManager {
+
+    private final ProcessContext context;
+    private final ProcessSessionFactory sessionFactory;
+    private final ComponentLog logger;
+    private final int maxBinCount;
+
+    private final AtomicLong maxBinAgeNanos = new AtomicLong(Long.MAX_VALUE);
+    private final Map<String, List<RecordBin>> groupBinMap = new HashMap<>(); // guarded by lock
+    private final Lock lock = new ReentrantLock();
+
+    private final AtomicInteger binCount = new AtomicInteger(0);
+
+    public RecordBinManager(final ProcessContext context, final ProcessSessionFactory sessionFactory, final ComponentLog logger) {
+        this.context = context;
+        this.sessionFactory = sessionFactory;
+        this.logger = logger;
+
+        final Integer maxBins = context.getProperty(MergeRecord.MAX_BIN_COUNT).asInteger();
+        this.maxBinCount = maxBins == null ? Integer.MAX_VALUE : maxBins.intValue();
+    }
+
+    /**
+     * Must be called only when there are no active threads modifying the bins.
+     */
+    public void purge() {
+        lock.lock();
+        try {
+            for (final List<RecordBin> binList : groupBinMap.values()) {
+                for (final RecordBin bin : binList) {
+                    bin.rollback();
+                }
+            }
+            groupBinMap.clear();
+            binCount.set(0);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public void setMaxBinAge(final Long timePeriod, final TimeUnit timeUnit) {
+        if (timePeriod == null) {
+            maxBinAgeNanos.set(Long.MAX_VALUE);
+        } else {
+            maxBinAgeNanos.set(timeUnit.toNanos(timePeriod));
+        }
+    }
+
+
+    public int getBinCount() {
+        return binCount.get();
+    }
+
+    /**
+     * Adds the given flowFiles to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary.
+     * <p/>
+     *
+     * @param groupIdentifier the group to which the flow file belongs; can be null
+     * @param flowFile flowFile to bin
+     * @param reader RecordReader to use for reading FlowFile
+     * @param session the ProcessSession to which the FlowFiles belong
+     * @param block if another thread is already writing to the desired bin, passing <code>true</code> for this parameter will block until the other thread(s) have finished so
+     *            that the records can still be added to the desired bin. Passing <code>false</code> will result in moving on to another bin.
+     *
+     * @throws SchemaNotFoundException if unable to find the schema for the record writer
+     * @throws MalformedRecordException if unable to read a record
+     * @throws IOException if there is an IO problem reading from the stream or writing to the stream
+     */
+    public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block)
+        throws IOException, MalformedRecordException, SchemaNotFoundException {
+
+        final List<RecordBin> currentBins;
+        lock.lock();
+        try {
+            // Create a new List<RecordBin> if none exists for this Group ID. We use a CopyOnWriteArrayList here because
+            // we need to traverse the list in a couple of places and just below here, we call bin.offer() (which is very expensive)
+            // while traversing the List, so we don't want to do this within a synchronized block. If we end up seeing poor performance
+            // from this, we could look at instead using a Synchronized List and instead of calling bin.offer() while iterating allow for some
+            // sort of bin.tryLock() and have that lock only if the flowfile should be added. Then if it returns true, we can stop iterating
+            // and perform the expensive part and then ensure that we always unlock
+            currentBins = groupBinMap.computeIfAbsent(groupIdentifier, grpId -> new CopyOnWriteArrayList<>());
+        } finally {
+            lock.unlock();
+        }
+
+        RecordBin acceptedBin = null;
+        for (final RecordBin bin : currentBins) {
+            final boolean accepted = bin.offer(flowFile, reader, session, block);
+
+            if (accepted) {
+                acceptedBin = bin;
+                logger.debug("Transferred id={} to {}", new Object[] {flowFile.getId(), bin});
+                break;
+            }
+        }
+
+        // We have to do this outside of our for-loop above in order to avoid a concurrent modification Exception.
+        if (acceptedBin != null) {
+            if (acceptedBin.isComplete()) {
+                removeBins(groupIdentifier, Collections.singletonList(acceptedBin));
+            }
+
+            return;
+        }
+
+        // if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one
+        final RecordBin bin = new RecordBin(context, sessionFactory.createSession(), logger, createThresholds());
+        final boolean binAccepted = bin.offer(flowFile, reader, session, true);
+        if (!binAccepted) {
+            session.rollback();
+            throw new RuntimeException("Attempted to add " + flowFile + " to a new bin but failed. This is unexpected. Will roll back session and try again.");
+        }
+
+        logger.debug("Transferred id={} to {}", new Object[] {flowFile.getId(), bin});
+
+        if (!bin.isComplete()) {
+            final int updatedBinCount = binCount.incrementAndGet();
+
+            lock.lock();
+            try {
+                // We have already obtained the list of RecordBins from this Map above. However, we released
+                // the lock in order to avoid blocking while writing to a Bin. Because of this, it is possible
+                // that another thread may have already come in and removed this List from the Map, if all
+                // Bins in the List have been completed. As a result, we must now obtain the write lock again
+                // and obtain the List (or a new one), and then update that. This ensures that we never lose
+                // track of a Bin. If we don't lose this, we could completely lose a Bin.
+                final List<RecordBin> bins = groupBinMap.computeIfAbsent(groupIdentifier, grpId -> new CopyOnWriteArrayList<>());
+                bins.add(bin);
+            } finally {
+                lock.unlock();
+            }
+
+            if (updatedBinCount > maxBinCount) {
+                completeOldestBin();
+            }
+        }
+    }
+
+
+    private RecordBinThresholds createThresholds() {
+        final int minRecords = context.getProperty(MergeRecord.MIN_RECORDS).asInteger();
+        final int maxRecords = context.getProperty(MergeRecord.MAX_RECORDS).asInteger();
+        final long minBytes = context.getProperty(MergeRecord.MIN_SIZE).asDataSize(DataUnit.B).longValue();
+
+        final PropertyValue maxSizeValue = context.getProperty(MergeRecord.MAX_SIZE);
+        final long maxBytes = maxSizeValue.isSet() ? maxSizeValue.asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
+
+        final PropertyValue maxMillisValue = context.getProperty(MergeRecord.MAX_BIN_AGE);
+        final String maxBinAge = maxMillisValue.getValue();
+        final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS).longValue() : Long.MAX_VALUE;
+
+        final String recordCountAttribute;
+        final String mergeStrategy = context.getProperty(MergeRecord.MERGE_STRATEGY).getValue();
+        if (MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+            recordCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
+        } else {
+            recordCountAttribute = null;
+        }
+
+        return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, recordCountAttribute);
+    }
+
+
+    public void completeOldestBin() throws IOException {
+        RecordBin oldestBin = null;
+
+        lock.lock();
+        try {
+            String oldestBinGroup = null;
+
+            for (final Map.Entry<String, List<RecordBin>> group : groupBinMap.entrySet()) {
+                for (final RecordBin bin : group.getValue()) {
+                    if (oldestBin == null || bin.isOlderThan(oldestBin)) {
+                        oldestBin = bin;
+                        oldestBinGroup = group.getKey();
+                    }
+                }
+            }
+
+            if (oldestBin == null) {
+                return;
+            }
+
+            removeBins(oldestBinGroup, Collections.singletonList(oldestBin));
+        } finally {
+            lock.unlock();
+        }
+
+        logger.debug("Completing Bin " + oldestBin + " because the maximum number of bins has been exceeded");
+        oldestBin.complete("Maximum number of bins has been exceeded");
+    }
+
+
+    public void completeExpiredBins() throws IOException {
+        final long maxNanos = maxBinAgeNanos.get();
+        final Map<String, List<RecordBin>> expiredBinMap = new HashMap<>();
+
+        lock.lock();
+        try {
+            for (final Map.Entry<String, List<RecordBin>> entry : groupBinMap.entrySet()) {
+                final String key = entry.getKey();
+                final List<RecordBin> bins = entry.getValue();
+
+                for (final RecordBin bin : bins) {
+                    if (bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS)) {
+                        final List<RecordBin> expiredBinsForKey = expiredBinMap.computeIfAbsent(key, ignore -> new ArrayList<>());
+                        expiredBinsForKey.add(bin);
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+
+        for (final Map.Entry<String, List<RecordBin>> entry : expiredBinMap.entrySet()) {
+            final String key = entry.getKey();
+            final List<RecordBin> expiredBins = entry.getValue();
+
+            for (final RecordBin bin : expiredBins) {
+                logger.debug("Completing Bin {} because it has expired");
+                bin.complete("Bin has reached Max Bin Age");
+            }
+
+            removeBins(key, expiredBins);
+        }
+    }
+
+    private void removeBins(final String key, final List<RecordBin> bins) {
+        lock.lock();
+        try {
+            final List<RecordBin> list = groupBinMap.get(key);
+            if (list != null) {
+                final int initialSize = list.size();
+                list.removeAll(bins);
+
+                // Determine how many items were removed from the list and
+                // update our binCount to keep track of this.
+                final int removedCount = initialSize - list.size();
+                binCount.addAndGet(-removedCount);
+
+                if (list.isEmpty()) {
+                    groupBinMap.remove(key);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+}