You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/07/12 20:58:11 UTC

[1/2] nifi git commit: NIFI-4060: Initial implementation of MergeRecord

Repository: nifi
Updated Branches:
  refs/heads/master eefad2916 -> b603cb955


http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
new file mode 100644
index 0000000..16c05ac
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.util.Optional;
+
+public class RecordBinThresholds {
+    private final int minRecords;
+    private final int maxRecords;
+    private final long minBytes;
+    private final long maxBytes;
+    private final long maxBinMillis;
+    private final String maxBinAge;
+    private final Optional<String> recordCountAttribute;
+
+    public RecordBinThresholds(final int minRecords, final int maxRecords, final long minBytes, final long maxBytes, final long maxBinMillis,
+        final String maxBinAge, final String recordCountAttribute) {
+        this.minRecords = minRecords;
+        this.maxRecords = maxRecords;
+        this.minBytes = minBytes;
+        this.maxBytes = maxBytes;
+        this.maxBinMillis = maxBinMillis;
+        this.maxBinAge = maxBinAge;
+        this.recordCountAttribute = Optional.ofNullable(recordCountAttribute);
+    }
+
+    public int getMinRecords() {
+        return minRecords;
+    }
+
+    public int getMaxRecords() {
+        return maxRecords;
+    }
+
+    public long getMinBytes() {
+        return minBytes;
+    }
+
+    public long getMaxBytes() {
+        return maxBytes;
+    }
+
+    public long getMaxBinMillis() {
+        return maxBinMillis;
+    }
+
+    public String getMaxBinAge() {
+        return maxBinAge;
+    }
+
+    public Optional<String> getRecordCountAttribute() {
+        return recordCountAttribute;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 09681da..aa8e931 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -62,6 +62,7 @@ org.apache.nifi.processors.standard.LogMessage
 org.apache.nifi.processors.standard.LookupAttribute
 org.apache.nifi.processors.standard.LookupRecord
 org.apache.nifi.processors.standard.MergeContent
+org.apache.nifi.processors.standard.MergeRecord
 org.apache.nifi.processors.standard.ModifyBytes
 org.apache.nifi.processors.standard.MonitorActivity
 org.apache.nifi.processors.standard.Notify

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html
new file mode 100644
index 0000000..ebd8856
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html
@@ -0,0 +1,229 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>MergeRecord</title>
+
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+
+        <h3>Introduction</h3>
+    	<p>
+    	    The MergeRecord Processor allows the user to take many FlowFiles that consist of record-oriented data (any data format for which there is
+    	    a Record Reader available) and combine the FlowFiles into one larger FlowFile. This may be preferable before pushing the data to a downstream
+    	    system that prefers larger batches of data, such as HDFS, or in order to improve performance of a NiFi flow by reducing the number of FlowFiles
+    	    that flow through the system (thereby reducing the contention placed on the FlowFile Repository, Provenance Repository, Content Repository, and
+    	    FlowFile Queues).
+    	</p>
+
+    	<p>
+    		The Processor creates several 'bins' to put the FlowFiles in. The maximum number of bins to use is set to 5 by default, but this can be changed
+    		by updating the value of the &lt;Maximum number of Bins&gt; property. The number of bins is bound in order to avoid running out of Java heap space.
+    		Note: while the contents of a FlowFile are stored in the Content Repository and not in the Java heap space, the Processor must hold the FlowFile
+    		objects themselves in memory. As a result, these FlowFiles with their attributes can potentially take up a great deal of heap space and cause
+    		OutOfMemoryError's to be thrown. In order to avoid this, if you expect to merge many small FlowFiles together, it is advisable to instead use a
+    		MergeContent that merges no more than say 1,000 FlowFiles into a bundle and then use a second MergeContent to merges these small bundles into
+    		larger bundles. For example, to merge 1,000,000 FlowFiles together, use MergeRecord that uses a &lt;Maximum Number of Records&gt; of 1,000 and route the
+    		"merged" Relationship to a second MergeRecord that also sets the &lt;Maximum Number of Records&gt; to 1,000. The second MergeRecord will then merge 1,000 bundles
+    		of 1,000, which in effect produces bundles of 1,000,000.
+    	</p>
+
+
+
+    	<h3>How FlowFiles are Binned</h3>
+    	<p>
+    	    How the Processor determines which bin to place a FlowFile in depends on a few different configuration options. Firstly, the Merge Strategy
+    	    is considered. The Merge Strategy can be set to one of two options: Bin Packing Algorithm, or Defragment. When the goal is to simply combine
+    	    smaller FlowFiles into one larger FlowFiles, the Bin Packing Algorithm should be used. This algorithm picks a bin based on whether or not the FlowFile
+    	    can fit in the bin according to its size and the &lt;Maximum Bin Size&gt; property and whether or not the FlowFile is 'like' the other FlowFiles in
+    	    the bin. What it means for two FlowFiles to be 'like FlowFiles' is discussed at the end of this section.
+    	</p>
+    	
+    	<p>
+    	    The "Defragment" Merge Strategy can be used when records need to be explicitly assigned to the same bin. For example, if data is split apart using
+    	    the SplitRecord Processor, each 'split' can be processed independently and later merged back together using this Processor with the
+    	    Merge Strategy set to Defragment. In order for FlowFiles to be added to the same bin when using this configuration, the FlowFiles must have the same
+    	    value for the "fragment.identifier" attribute. Each FlowFile with the same identifier must also have the same value for the "fragment.count" attribute
+    	    (which indicates how many FlowFiles belong in the bin) and a unique value for the "fragment.index" attribute so that the FlowFiles can be ordered
+    	    correctly.
+    	</p>
+    	
+    	<p>
+    	    In order to be added to the same bin, two FlowFiles must be 'like FlowFiles.' In order for two FlowFiles to be like FlowFiles, they must have the same
+    	    schema, and if the &lt;Correlation Attribute Name&gt; property is set, they must have the same value for the specified attribute. For example, if the
+    	    &lt;Correlation Attribute Name&gt; is set to "filename" then two FlowFiles must have the same value for the "filename" attribute in order to be binned
+    	    together. If more than one attribute is needed in order to correlate two FlowFiles, it is recommended to use an UpdateAttribute processor before the
+    	    MergeRecord processor and combine the attributes. For example, if the goal is to bin together two FlowFiles only if they have the same value for the
+    	    "abc" attribute and the "xyz" attribute, then we could accomplish this by using UpdateAttribute and adding a property with name "correlation.attribute"
+    	    and a value of "abc=${abc},xyz=${xyz}" and then setting MergeRecord's &lt;Correlation Attribute Name&gt; property to "correlation.attribute".
+    	</p>
+    	
+    	<p>
+    		It is often useful to bin together only Records that have the same value for some field. For example, if we have point-of-sale data, perhaps the desire
+    		is to bin together records that belong to the same store, as identified by the 'storeId' field. This can be accomplished by making use of the PartitionRecord
+    		Processor ahead of MergeRecord. This Processor will allow one or more fields to be configured as the partitioning criteria and will create attributes for those
+    		corresponding values. An UpdateAttribute processor could then be used, if necessary, to combine multiple attributes into a single correlation attribute,
+    		as described above. See documentation for those processors for more details.
+    	</p>
+
+
+
+		<h3>When a Bin is Merged</h3>    	
+    	<p>
+    	    Above, we discussed how a bin is chosen for a given FlowFile. Once a bin has been created and FlowFiles added to it, we must have some way to determine
+    	    when a bin is "full" so that we can bin those FlowFiles together into a "merged" FlowFile. There are a few criteria that are used to make a determination as
+    	    to whether or not a bin should be merged.
+    	</p>
+
+		<p>
+		    If the &lt;Merge Strategy&gt; property is set to "Bin Packing Algorithm" then then the following rules will be evaluated.
+		    Firstly, in order for a bin to be full, both of the thresholds specified by the &lt;Minimum Bin Size&gt; and the &lt;Minimum Number of Records&gt; properties
+		    must be satisfied. If one of these properties is not set, then it is ignored. Secondly, if either the &lt;Maximum Bin Size&gt; or the &lt;Maximum Number of
+		    Records&gt; property is reached, then the bin is merged. That is, both of the minimum values must be reached but only one of the maximum values need be reached.
+		    Note that the &lt;Maximum Number of Records&gt; property is a "soft limit," meaning that all records in a given input FlowFile will be added to the same bin, and
+		    as a result the number of records may exceed the maximum configured number of records. Once this happens, though, no more Records will be added to that same bin
+		    from another FlowFile.
+		    If the &lt;Max Bin Age&gt; is reached for a bin, then the FlowFiles in that bin will be merged, <b>even if</b> the minimum bin size and minimum number of records
+		    have not yet been met. Finally, if the maximum number of bins have been created (as specified by the &lt;Maximum number of Bins&gt; property), and some input FlowFiles
+		    cannot fit into any of the existing bins, then the oldest bin will be merged to make room. This is done because otherwise we would not be able to add any
+		    additional FlowFiles to the existing bins and would have to wait until the Max Bin Age is reached (if ever) in order to merge any FlowFiles.
+		</p>
+
+        <p>
+            If the &lt;Merge Strategy&gt; property is set to "Defragment" then a bin is full only when the number of FlowFiles in the bin is equal to the number specified
+            by the "fragment.count" attribute of one of the FlowFiles in the bin. All FlowFiles that have this attribute must have the same value for this attribute,
+            or else they will be routed to the "failure" relationship. It is not necessary that all FlowFiles have this value, but at least one FlowFile in the bin must have
+            this value or the bin will never be complete. If all of the necessary FlowFiles are not binned together by the point at which the bin times amount
+            (as specified by the &lt;Max Bin Age&gt; property), then the FlowFiles will all be routed to the 'failure' relationship instead of being merged together.
+        </p>
+
+        <p>
+            Once a bin is merged into a single FlowFile, it can sometimes be useful to understand why exactly the bin was merged when it was. For example, if the maximum number
+            of allowable bins is reached, a merged FlowFile may consist of far fewer records than expected. In order to help understand the behavior, the Processor will emit
+            a JOIN Provenance Events when creating the merged FlowFile, and the JOIN event will include in it a "Details" field that explains why the bin was merged when it was.
+            For example, the event will indicate "Records Merged due to: Bin is full" if the bin reached its minimum thresholds and no more subsequent FlowFiles were able to be
+            added to it. Or it may indicate "Records Merged due to: Maximum number of bins has been exceeded" if the bin was merged due to the configured maximum number of bins
+            being filled and needing to free up space for a new bin.
+        </p>
+
+
+    	<h3>When a Failure Occurs</h3>
+    	<p>
+    	    When a bin is filled, the Processor is responsible for merging together all of the records in those FlowFiles into a single FlowFile. If the Processor fails
+    	    to do so for any reason (for example, a Record cannot be read from an input FlowFile), then all of the FlowFiles in that bin are routed to the 'failure'
+    	    Relationship. The Processor does not skip the single problematic FlowFile and merge the others. This behavior was chosen because of two different considerations.
+    	    Firstly, without those problematic records, the bin may not truly be full, as the minimum bin size may not be reached without those records.
+    	    Secondly, and more importantly, if the problematic FlowFile contains 100 "good" records before the problematic ones, those 100 records would already have been
+    	    written to the "merged" FlowFile. We cannot un-write those records. If we were to then send those 100 records on and route the problematic FlowFile to 'failure'
+    	    then in a situation where the "failure" relationship is eventually routed back to MergeRecord, we could end up continually duplicating those 100 successfully
+    	    processed records.
+    	</p>
+    	
+    	
+    	
+    	<h2>Examples</h2>
+    	
+    	<p>
+    		To better understand how this Processor works, we will lay out a few examples. For the sake of simplicity of these examples, we will use CSV-formatted data and
+    		write the merged data as CSV-formatted data, but
+    		the format of the data is not really relevant, as long as there is a Record Reader that is capable of reading the data and a Record Writer capable of writing
+    		the data in the desired format.
+    	</p>
+
+
+
+    	<h3>Example 1 - Batching Together Many Small FlowFiles</h3>
+    	
+    	<p>
+    		When we want to batch together many small FlowFiles in order to create one larger FlowFile, we will accomplish this by using the "Bin Packing Algorithm"
+    		Merge Strategy. The idea here is to bundle together as many FlowFiles as we can within our minimum and maximum number of records and bin size.
+    		Consider that we have the following properties set:
+    	</p>
+
+<table>
+  <tr>
+    <th>Property Name</th>
+    <th>Property Value</th>
+  </tr>
+  <tr>
+    <td>Merge Strategy</td>
+    <td>Bin Packing Algorithm</td>
+  </tr>
+  <tr>
+    <td>Minimum Number of Records</td>
+    <td>3</td>
+  </tr>
+  <tr>
+    <td>Maximum Number of Records</td>
+    <td>5</td>
+  </tr>
+</table>
+
+        <p>
+            Also consider that we have the following data on the queue, with the schema indicating a Name and an Age field:
+        </p>
+
+<table>
+  <tr>
+    <th>FlowFile ID</th>
+    <th>FlowFile Contents</th>
+  </tr>
+  <tr>
+    <td>1</td>
+    <td>Mark, 33</td>
+  </tr>
+  <tr>
+    <td>2</td>
+    <td>John, 45<br />Jane, 43</td>
+  </tr>
+  <tr>
+    <td>3</td>
+    <td>Jake, 3</td>
+  </tr>
+  <tr>
+    <td>4</td>
+    <td>Jan, 2</td>
+  </tr>
+</table>
+
+		<p>
+			In this, because we have not configured a Correlation Attribute, and because all FlowFiles have the same schema, the Processor
+			will attempt to add all of these FlowFiles to the same bin. Because the Minimum Number of Records is 3 and the Maximum Number of Records is 5,
+			all of the FlowFiles will be added to the same bin. The output, then, is a single FlowFile with the following content:
+		</p>
+
+<code>
+<pre>
+Mark, 33
+John, 45
+Jane, 43
+Jake, 3
+Jan, 2
+</pre>
+</code>
+
+		<p>
+		   When the Processor runs, it will bin all of the FlowFiles that it can get from the queue. After that, it will merge any bin that is "full enough."
+		   So if we had only 3 FlowFiles on the queue, those 3 would have been added, and a new bin would have been created in the next iteration, once the
+		   4th FlowFile showed up. However, if we had 8 FlowFiles queued up, only 5 would have been added to the first bin. The other 3 would have been added
+		   to a second bin, and that bin would then be merged since it reached the minimum threshold of 3 also.
+		</p>
+
+	</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index b8f9210..4cd4460 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -48,8 +50,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
@@ -826,7 +827,7 @@ public class TestMergeContent {
     @Test
     public void testUniqueAttributes() {
         final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
-        runner.setProperty(MergeContent.ATTRIBUTE_STRATEGY, MergeContent.ATTRIBUTE_STRATEGY_ALL_UNIQUE);
+        runner.setProperty(AttributeStrategyUtil.ATTRIBUTE_STRATEGY, AttributeStrategyUtil.ATTRIBUTE_STRATEGY_ALL_UNIQUE);
         runner.setProperty(MergeContent.MAX_SIZE, "2 B");
         runner.setProperty(MergeContent.MIN_SIZE, "2 B");
 
@@ -856,7 +857,7 @@ public class TestMergeContent {
     @Test
     public void testCommonAttributesOnly() {
         final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
-        runner.setProperty(MergeContent.ATTRIBUTE_STRATEGY, MergeContent.ATTRIBUTE_STRATEGY_ALL_COMMON);
+        runner.setProperty(AttributeStrategyUtil.ATTRIBUTE_STRATEGY, AttributeStrategyUtil.ATTRIBUTE_STRATEGY_ALL_COMMON);
         runner.setProperty(MergeContent.MAX_SIZE, "2 B");
         runner.setProperty(MergeContent.MIN_SIZE, "2 B");
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
new file mode 100644
index 0000000..5ee4dd9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.CommaSeparatedRecordReader;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestMergeRecord {
+    private TestRunner runner;
+    private CommaSeparatedRecordReader readerService;
+    private MockRecordWriter writerService;
+
+    @Before
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(new MergeRecord());
+
+        readerService = new CommaSeparatedRecordReader();
+        writerService = new MockRecordWriter("header", false);
+
+        runner.addControllerService("reader", readerService);
+
+        runner.enableControllerService(readerService);
+
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(MergeRecord.RECORD_READER, "reader");
+        runner.setProperty(MergeRecord.RECORD_WRITER, "writer");
+    }
+
+    @Test
+    public void testMergeSimple() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "2");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "2");
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Age\nJane, 34");
+
+        runner.run(2);
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
+
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        mff.assertAttributeEquals("record.count", "2");
+        mff.assertContentEquals("header\nJohn,35\nJane,34\n");
+    }
+
+
+    // Verify that FlowFiles are grouped with like schemas.
+    @Test
+    public void testDifferentSchema() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "2");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "2");
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Color\nJane, Red");
+
+        runner.run(2, false, true);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+
+        runner.enqueue("Name, Age\nJane, 34");
+        runner.enqueue("Name, Color\nJohn, Blue");
+
+        runner.run(2, true, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
+
+        final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+        assertEquals(1L, mffs.stream()
+            .filter(ff -> "2".equals(ff.getAttribute("record.count")))
+            .filter(ff -> "header\nJohn,35\nJane,34\n".equals(new String(ff.toByteArray())))
+            .count());
+
+        assertEquals(1L, mffs.stream()
+            .filter(ff -> "2".equals(ff.getAttribute("record.count")))
+            .filter(ff -> "header\nJane,Red\nJohn,Blue\n".equals(new String(ff.toByteArray())))
+            .count());
+    }
+
+    @Test
+    public void testFailureToParse() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "2");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "3");
+
+        readerService.failAfter(2);
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Age\nJane, 34");
+        runner.enqueue("Name, Age\nJake, 3");
+
+        runner.run();
+
+        // We have to route all of the FlowFiles in the same bin to the 'failure' relationship.
+        // Otherwise, we may have read some of the records from the failing FlowFile and then
+        // routed it to failure, which would result in some of its records moving on and others not.
+        // This, in turn, would result in the same records being added to potentially many FlowFiles.
+        runner.assertAllFlowFilesTransferred(MergeRecord.REL_FAILURE, 3);
+    }
+
+    @Test
+    public void testDefragment() {
+        runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
+
+        final Map<String, String> attr1 = new HashMap<>();
+        attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+
+        final Map<String, String> attr2 = new HashMap<>();
+        attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+
+        final Map<String, String> attr3 = new HashMap<>();
+        attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
+
+        final Map<String, String> attr4 = new HashMap<>();
+        attr4.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
+
+        runner.enqueue("Name, Age\nJohn, 35", attr1);
+        runner.enqueue("Name, Age\nJane, 34", attr2);
+
+        runner.enqueue("Name, Age\nJake, 3", attr3);
+        runner.enqueue("Name, Age\nJan, 2", attr4);
+
+        runner.run(4);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
+
+        final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+        assertEquals(1L, mffs.stream()
+            .filter(ff -> "2".equals(ff.getAttribute("record.count")))
+            .filter(ff -> "header\nJohn,35\nJane,34\n".equals(new String(ff.toByteArray())))
+            .count());
+
+        assertEquals(1L, mffs.stream()
+            .filter(ff -> "2".equals(ff.getAttribute("record.count")))
+            .filter(ff -> "header\nJake,3\nJan,2\n".equals(new String(ff.toByteArray())))
+            .count());
+    }
+
+
+    @Test
+    public void testMinSize() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "2");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "2");
+        runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Age\nJane, 34");
+
+        runner.run();
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+
+        final StringBuilder sb = new StringBuilder("Name, Age\n");
+        for (int i = 0; i < 100; i++) {
+            sb.append("Person " + i + ", " + i + "\n");
+        }
+        runner.enqueue(sb.toString());
+
+        runner.run();
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 3);
+    }
+
+    @Test
+    public void testMinRecords() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "103");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "2");
+        runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Age\nJane, 34");
+
+        final StringBuilder sb = new StringBuilder("Name, Age\n");
+        for (int i = 0; i < 100; i++) {
+            sb.append("Person " + i + ", " + i + "\n");
+        }
+        runner.enqueue(sb.toString());
+
+        runner.run();
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.run();
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
+    }
+
+    @Test
+    public void testMaxRecords() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "5");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "10");
+
+        for (int i = 0; i < 34; i++) {
+            runner.enqueue("Name, Age\nJohn, 35");
+        }
+
+        runner.run();
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 3);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 30);
+
+        assertEquals(4, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void testMaxSize() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "5");
+        runner.setProperty(MergeRecord.MAX_SIZE, "100 B");
+
+        for (int i = 0; i < 36; i++) {
+            runner.enqueue("Name, Age\nJohnny, 5");
+        }
+
+        runner.run();
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 3);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 33);
+
+        assertEquals(3, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    @Ignore("This unit test depends on timing and could potentially cause problems in an automated build environment. However, it can be useful for manual testing")
+    public void testTimeout() throws InterruptedException {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "500");
+        runner.setProperty(MergeRecord.MAX_BIN_AGE, "500 millis");
+
+        for (int i = 0; i < 100; i++) {
+            runner.enqueue("Name, Age\nJohnny, 5");
+        }
+
+        runner.run(1, false);
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+
+        Thread.sleep(750);
+        runner.run(1, true, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 100);
+    }
+
+
+    @Test
+    public void testBinCount() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "5");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "10");
+        runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
+        runner.setProperty(MergeRecord.CORRELATION_ATTRIBUTE_NAME, "correlationId");
+
+        final Map<String, String> attrs = new HashMap<>();
+        for (int i = 0; i < 5; i++) {
+            attrs.put("correlationId", String.valueOf(i));
+            runner.enqueue("Name, Age\nJohn, 3" + i, attrs);
+        }
+
+        runner.run(1, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
+
+        attrs.put("correlationId", "5");
+        runner.enqueue("Name, Age\nJohn, 35", attrs);
+        assertEquals(5, ((MergeRecord) runner.getProcessor()).getBinCount());
+        runner.run(1, false, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
+        assertEquals(5, ((MergeRecord) runner.getProcessor()).getBinCount());
+    }
+
+    @Test
+    public void testDefragmentOldestBinFailsWhenTooManyBins() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "5");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "10");
+        runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
+        runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "5");
+        for (int i = 0; i < 5; i++) {
+            attrs.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, String.valueOf(i));
+            runner.enqueue("Name, Age\nJohn, 3" + i, attrs);
+        }
+
+        runner.run(1, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
+
+        attrs.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "5");
+        runner.enqueue("Name, Age\nJohn, 35", attrs);
+        assertEquals(5, ((MergeRecord) runner.getProcessor()).getBinCount());
+        runner.run(1, false, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 1);
+        assertEquals(5, ((MergeRecord) runner.getProcessor()).getBinCount());
+    }
+
+    @Test
+    public void testDefragmentExpiredBinFailsOnTimeout() throws InterruptedException {
+        runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
+        runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
+        runner.setProperty(MergeRecord.MAX_BIN_AGE, "1 millis");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "5");
+        attrs.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "0");
+        runner.enqueue("Name, Age\nJohn, 30", attrs);
+
+        runner.run(1, false);
+
+        Thread.sleep(50L);
+        runner.run(1, true, false);
+
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+        runner.assertTransferCount(MergeRecord.REL_FAILURE, 1);
+    }
+}


[2/2] nifi git commit: NIFI-4060: Initial implementation of MergeRecord

Posted by ma...@apache.org.
NIFI-4060: Initial implementation of MergeRecord

NIFI-4060: Addressed threading issue with RecordBin being updated after it is completed; fixed issue that caused mime.type attribute not to be written properly if all incoming flowfiles already have a different value for that attribute

NIFI-4060: Bug fixes; improved documentation; added a lot of debug information; updated StandardProcessSession to produce more accurate logs in case of a session being committed/rolled back with open input/output streams
Signed-off-by: Matt Burgess <ma...@apache.org>

This closes #1958


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b603cb95
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b603cb95
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b603cb95

Branch: refs/heads/master
Commit: b603cb955dcd1d3d9b5e374e5760f2f9b047bda9
Parents: eefad29
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jun 26 13:15:03 2017 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Wed Jul 12 16:36:48 2017 -0400

----------------------------------------------------------------------
 .../nifi/processor/util/bin/BinFiles.java       |  14 +-
 .../record/CommaSeparatedRecordReader.java      | 102 +++++
 .../repository/StandardProcessSession.java      |  19 +-
 .../nifi/processors/standard/MergeContent.java  | 104 +----
 .../nifi/processors/standard/MergeRecord.java   | 358 ++++++++++++++++
 .../standard/merge/AttributeStrategy.java       |  27 ++
 .../standard/merge/AttributeStrategyUtil.java   |  56 +++
 .../merge/KeepCommonAttributeStrategy.java      |  64 +++
 .../merge/KeepUniqueAttributeStrategy.java      |  58 +++
 .../processors/standard/merge/RecordBin.java    | 424 +++++++++++++++++++
 .../standard/merge/RecordBinManager.java        | 295 +++++++++++++
 .../standard/merge/RecordBinThresholds.java     |  69 +++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      | 229 ++++++++++
 .../processors/standard/TestMergeContent.java   |   9 +-
 .../processors/standard/TestMergeRecord.java    | 360 ++++++++++++++++
 16 files changed, 2076 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index b15d23b..7f79b70 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -131,9 +131,10 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
      *
      * @param context context
      * @param flowFile flowFile
+     * @param session the session for accessing the FlowFile
      * @return The appropriate group ID
      */
-    protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile);
+    protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session);
 
     /**
      * Performs any additional setup of the bin manager. Called during the OnScheduled phase.
@@ -271,8 +272,15 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
             final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>();
             for (FlowFile flowFile : flowFiles) {
                 flowFile = this.preprocessFlowFile(context, session, flowFile);
-                final String groupingIdentifier = getGroupId(context, flowFile);
-                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
+
+                try {
+                    final String groupingIdentifier = getGroupId(context, flowFile, session);
+                    flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile);
+                } catch (final Exception e) {
+                    getLogger().error("Could not determine which Bin to add {} to; will route to failure", new Object[] {flowFile}, e);
+                    session.transfer(flowFile, REL_FAILURE);
+                    continue;
+                }
             }
 
             for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
new file mode 100644
index 0000000..8973055
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+
+public class CommaSeparatedRecordReader extends AbstractControllerService implements RecordReaderFactory {
+    private int failAfterN;
+    private int recordCount = 0;
+
+    public CommaSeparatedRecordReader() {
+        this(-1);
+    }
+
+    public CommaSeparatedRecordReader(final int failAfterN) {
+        this.failAfterN = failAfterN;
+    }
+
+    public void failAfter(final int failAfterN) {
+        this.failAfterN = failAfterN;
+    }
+
+    @Override
+    public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
+        final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+
+        final List<RecordField> fields = new ArrayList<>();
+
+        final String headerLine = reader.readLine();
+        for (final String colName : headerLine.split(",")) {
+            fields.add(new RecordField(colName.trim(), RecordFieldType.STRING.getDataType()));
+        }
+
+        return new RecordReader() {
+
+            @Override
+            public void close() throws IOException {
+                reader.close();
+            }
+
+            @Override
+            public Record nextRecord() throws IOException, MalformedRecordException {
+                if (failAfterN > -1 && recordCount >= failAfterN) {
+                    throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
+                }
+
+                final String nextLine = reader.readLine();
+                if (nextLine == null) {
+                    return null;
+                }
+
+                recordCount++;
+
+                final String[] values = nextLine.split(",");
+                final Map<String, Object> valueMap = new HashMap<>();
+                int i = 0;
+                for (final RecordField field : fields) {
+                    final String fieldName = field.getFieldName();
+                    valueMap.put(fieldName, values[i++].trim());
+                }
+
+                return new MapRecord(new SimpleRecordSchema(fields), valueMap);
+            }
+
+            @Override
+            public RecordSchema getSchema() {
+                return new SimpleRecordSchema(fields);
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index d34d8cf..d2a6af6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -191,13 +191,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         processingStartTime = System.nanoTime();
     }
 
-    private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap) {
+    private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap, final String action, final String streamType) {
         final Map<FlowFile, ? extends Closeable> openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List
         for (final Map.Entry<FlowFile, ? extends Closeable> entry : openStreamCopy.entrySet()) {
             final FlowFile flowFile = entry.getKey();
             final Closeable openStream = entry.getValue();
 
-            LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", this, openStream, flowFile);
+            LOG.warn("{} closing {} for {} because the session was {} without the {} stream being closed.", this, openStream, flowFile, action, streamType);
 
             try {
                 openStream.close();
@@ -212,8 +212,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         resetWriteClaims(false);
 
-        closeStreams(openInputStreams);
-        closeStreams(openOutputStreams);
+        closeStreams(openInputStreams, "committed", "input");
+        closeStreams(openOutputStreams, "committed", "output");
 
         if (!readRecursionSet.isEmpty()) {
             throw new IllegalStateException();
@@ -914,8 +914,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         deleteOnCommit.clear();
 
-        closeStreams(openInputStreams);
-        closeStreams(openOutputStreams);
+        closeStreams(openInputStreams, "rolled back", "input");
+        closeStreams(openOutputStreams, "rolled back", "output");
 
         try {
             claimCache.reset();
@@ -2171,7 +2171,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e);
         }
 
-        final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), false);
+        final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
         final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
         final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn);
         final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
@@ -2470,7 +2470,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     final long bytesWritten = countingOut.getBytesWritten();
                     StandardProcessSession.this.bytesWritten += bytesWritten;
 
-                    openOutputStreams.remove(sourceFlowFile);
+                    final OutputStream removed = openOutputStreams.remove(sourceFlowFile);
+                    if (removed == null) {
+                        LOG.error("Closed Session's OutputStream but there was no entry for it in the map; sourceFlowFile={}; map={}", sourceFlowFile, openOutputStreams);
+                    }
 
                     flush();
                     removeTemporaryClaim(record);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 3401d66..edbc033 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -82,6 +82,8 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.bin.Bin;
 import org.apache.nifi.processor.util.bin.BinFiles;
 import org.apache.nifi.processor.util.bin.BinManager;
+import org.apache.nifi.processors.standard.merge.AttributeStrategy;
+import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
 import org.apache.nifi.stream.io.NonCloseableOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FlowFilePackager;
@@ -126,7 +128,7 @@ import org.apache.nifi.util.FlowFilePackagerV3;
     @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
     @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
         + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") })
-@SeeAlso(SegmentContent.class)
+@SeeAlso({SegmentContent.class, MergeRecord.class})
 public class MergeContent extends BinFiles {
 
     // preferred attributes
@@ -201,8 +203,6 @@ public class MergeContent extends BinFiles {
             MERGE_FORMAT_AVRO_VALUE,
             "The Avro contents of all FlowFiles will be concatenated together into a single FlowFile");
 
-    public static final String ATTRIBUTE_STRATEGY_ALL_COMMON = "Keep Only Common Attributes";
-    public static final String ATTRIBUTE_STRATEGY_ALL_UNIQUE = "Keep All Unique Attributes";
 
     public static final String TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions";
     public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
@@ -224,16 +224,6 @@ public class MergeContent extends BinFiles {
             .allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO)
             .defaultValue(MERGE_FORMAT_CONCAT.getValue())
             .build();
-    public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder()
-            .required(true)
-            .name("Attribute Strategy")
-            .description("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any "
-                    + "attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. "
-                    + "If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same "
-                    + "value, will be preserved.")
-            .allowableValues(ATTRIBUTE_STRATEGY_ALL_COMMON, ATTRIBUTE_STRATEGY_ALL_UNIQUE)
-            .defaultValue(ATTRIBUTE_STRATEGY_ALL_COMMON)
-            .build();
 
     public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
             .name("Correlation Attribute Name")
@@ -315,7 +305,7 @@ public class MergeContent extends BinFiles {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(MERGE_STRATEGY);
         descriptors.add(MERGE_FORMAT);
-        descriptors.add(ATTRIBUTE_STRATEGY);
+        descriptors.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY);
         descriptors.add(CORRELATION_ATTRIBUTE_NAME);
         descriptors.add(MIN_ENTRIES);
         descriptors.add(MAX_ENTRIES);
@@ -378,7 +368,7 @@ public class MergeContent extends BinFiles {
     }
 
     @Override
-    protected String getGroupId(final ProcessContext context, final FlowFile flowFile) {
+    protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session) {
         final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME)
                 .evaluateAttributeExpressions(flowFile).getValue();
         String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName);
@@ -429,16 +419,7 @@ public class MergeContent extends BinFiles {
                 throw new AssertionError();
         }
 
-        final AttributeStrategy attributeStrategy;
-        switch (context.getProperty(ATTRIBUTE_STRATEGY).getValue()) {
-            case ATTRIBUTE_STRATEGY_ALL_UNIQUE:
-                attributeStrategy = new KeepUniqueAttributeStrategy();
-                break;
-            case ATTRIBUTE_STRATEGY_ALL_COMMON:
-            default:
-                attributeStrategy = new KeepCommonAttributeStrategy();
-                break;
-        }
+        final AttributeStrategy attributeStrategy = AttributeStrategyUtil.strategyFor(context);
 
         final List<FlowFile> contents = bin.getContents();
         final ProcessSession binSession = bin.getSession();
@@ -989,76 +970,7 @@ public class MergeContent extends BinFiles {
         }
     }
 
-    private static class KeepUniqueAttributeStrategy implements AttributeStrategy {
-
-        @Override
-        public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
-            final Map<String, String> newAttributes = new HashMap<>();
-            final Set<String> conflicting = new HashSet<>();
-
-            for (final FlowFile flowFile : flowFiles) {
-                for (final Map.Entry<String, String> attributeEntry : flowFile.getAttributes().entrySet()) {
-                    final String name = attributeEntry.getKey();
-                    final String value = attributeEntry.getValue();
-
-                    final String existingValue = newAttributes.get(name);
-                    if (existingValue != null && !existingValue.equals(value)) {
-                        conflicting.add(name);
-                    } else {
-                        newAttributes.put(name, value);
-                    }
-                }
-            }
-
-            for (final String attributeToRemove : conflicting) {
-                newAttributes.remove(attributeToRemove);
-            }
-
-            // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
-            newAttributes.remove(CoreAttributes.UUID.key());
-            return newAttributes;
-        }
-    }
-
-    private static class KeepCommonAttributeStrategy implements AttributeStrategy {
-
-        @Override
-        public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
-            final Map<String, String> result = new HashMap<>();
-
-            //trivial cases
-            if (flowFiles == null || flowFiles.isEmpty()) {
-                return result;
-            } else if (flowFiles.size() == 1) {
-                result.putAll(flowFiles.iterator().next().getAttributes());
-            }
-
-            /*
-             * Start with the first attribute map and only put an entry to the
-             * resultant map if it is common to every map.
-             */
-            final Map<String, String> firstMap = flowFiles.iterator().next().getAttributes();
-
-            outer:
-            for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
-                final String key = mapEntry.getKey();
-                final String value = mapEntry.getValue();
-
-                for (final FlowFile flowFile : flowFiles) {
-                    final Map<String, String> currMap = flowFile.getAttributes();
-                    final String curVal = currMap.get(key);
-                    if (curVal == null || !curVal.equals(value)) {
-                        continue outer;
-                    }
-                }
-                result.put(key, value);
-            }
 
-            // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
-            result.remove(CoreAttributes.UUID.key());
-            return result;
-        }
-    }
 
     private static class FragmentComparator implements Comparator<FlowFile> {
 
@@ -1079,8 +991,4 @@ public class MergeContent extends BinFiles {
         List<FlowFile> getUnmergedFlowFiles();
     }
 
-    private interface AttributeStrategy {
-
-        Map<String, String> getMergedAttributes(List<FlowFile> flowFiles);
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
new file mode 100644
index 0000000..b0e3f48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.FlowFileFilters;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
+import org.apache.nifi.processors.standard.merge.RecordBinManager;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+
+@SideEffectFree
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"merge", "record", "content", "correlation", "stream", "event"})
+@CapabilityDescription("This Processor merges together multiple record-oriented FlowFiles into a single FlowFile that contains all of the Records of the input FlowFiles. "
+    + "This Processor works by creating 'bins' and then adding FlowFiles to these bins until they are full. Once a bin is full, all of the FlowFiles will be combined into "
+    + "a single output FlowFile, and that FlowFile will be routed to the 'merged' Relationship. A bin will consist of potentially many 'like FlowFiles'. In order for two "
+    + "FlowFiles to be considered 'like FlowFiles', they must have the same Schema (as identified by the Record Reader) and, if the <Correlation Attribute Name> property "
+    + "is set, the same value for the specified attribute. See Processor Usage and Additional Details for more information.")
+@ReadsAttributes({
+    @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+        + "All FlowFiles with the same value for this attribute will be bundled together."),
+    @ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This "
+        + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+        + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected "
+        + "in the given bundle."),
+})
+@WritesAttributes({
+    @WritesAttribute(attribute = "record.count", description = "The merged FlowFile will have a 'record.count' attribute indicating the number of records "
+        + "that were written to the FlowFile."),
+    @WritesAttribute(attribute = "mime.type", description = "The MIME Type indicated by the Record Writer"),
+    @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
+    @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
+        + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"),
+    @WritesAttribute(attribute = "<Attributes from Record Writer>", description = "Any Attribute that the configured Record Writer returns will be added to the FlowFile.")
+})
+@SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class})
+public class MergeRecord extends AbstractSessionFactoryProcessor {
+    // attributes for defragmentation
+    public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
+
+    public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
+    public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
+
+    public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
+        "Bin-Packing Algorithm",
+        "Bin-Packing Algorithm",
+        "Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally "
+            + "their attributes (if the <Correlation Attribute> property is set)");
+    public static final AllowableValue MERGE_STRATEGY_DEFRAGMENT = new AllowableValue(
+        "Defragment",
+        "Defragment",
+        "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must "
+            + "have the attributes <fragment.identifier> and <fragment.count>. All FlowFiles with the same value for \"fragment.identifier\" "
+            + "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. The ordering of "
+            + "the Records that are output is not guaranteed.");
+
+
+    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("Specifies the Controller Service to use for reading incoming data")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Controller Service to use for writing out the records")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder()
+        .name("merge-strategy")
+        .displayName("Merge Strategy")
+        .description("Specifies the algorithm used to merge records. The 'Defragment' algorithm combines fragments that are associated by "
+            + "attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily "
+            + "chosen FlowFiles")
+        .required(true)
+        .allowableValues(MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT)
+        .defaultValue(MERGE_STRATEGY_BIN_PACK.getValue())
+        .build();
+    public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+        .name("correlation-attribute-name")
+        .displayName("Correlation Attribute Name")
+        .description("If specified, two FlowFiles will be binned together only if they have the same value for "
+            + "this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.")
+        .required(false)
+        .expressionLanguageSupported(false)
+        .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+        .defaultValue(null)
+        .build();
+    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
+        .name("min-bin-size")
+        .displayName("Minimum Bin Size")
+        .description("The minimum size of for the bin")
+        .required(true)
+        .defaultValue("0 B")
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .build();
+    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
+        .name("max-bin-size")
+        .displayName("Maximum Bin Size")
+        .description("The maximum size for the bundle. If not specified, there is no maximum. This is a 'soft limit' in that if a FlowFile is added to a bin, "
+            + "all records in that FlowFile will be added, so this limit may be exceeded by up to the number of bytes in last input FlowFile.")
+        .required(false)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor MIN_RECORDS = new PropertyDescriptor.Builder()
+        .name("min-records")
+        .displayName("Minimum Number of Records")
+        .description("The minimum number of records to include in a bin")
+        .required(true)
+        .defaultValue("1")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+    public static final PropertyDescriptor MAX_RECORDS = new PropertyDescriptor.Builder()
+        .name("max-records")
+        .displayName("Maximum Number of Records")
+        .description("The maximum number of Records to include in a bin. This is a 'soft limit' in that if a FlowFIle is added to a bin, all records in that FlowFile will be added, "
+            + "so this limit may be exceeded by up to the number of records in the last input FlowFile.")
+        .required(false)
+        .defaultValue("1000")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+    public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
+        .name("max.bin.count")
+        .displayName("Maximum Number of Bins")
+        .description("Specifies the maximum number of bins that can be held in memory at any one time. "
+            + "This number should not be smaller than the maximum number of conurrent threads for this Processor, "
+            + "or the bins that are created will often consist only of a single incoming FlowFile.")
+        .defaultValue("10")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
+        .name("max-bin-age")
+        .displayName("Max Bin Age")
+        .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> "
+            + "where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
+        .required(false)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .build();
+
+
+
+    public static final Relationship REL_MERGED = new Relationship.Builder()
+        .name("merged")
+        .description("The FlowFile containing the merged records")
+        .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("The FlowFiles that were used to create the bundle")
+        .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure")
+        .build();
+
+    private final AtomicReference<RecordBinManager> binManager = new AtomicReference<>();
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
+        properties.add(MERGE_STRATEGY);
+        properties.add(CORRELATION_ATTRIBUTE_NAME);
+        properties.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY);
+        properties.add(MIN_RECORDS);
+        properties.add(MAX_RECORDS);
+        properties.add(MIN_SIZE);
+        properties.add(MAX_SIZE);
+        properties.add(MAX_BIN_AGE);
+        properties.add(MAX_BIN_COUNT);
+        return properties;
+    }
+
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_MERGED);
+        return relationships;
+    }
+
+
+    @OnStopped
+    public final void resetState() {
+        final RecordBinManager manager = binManager.get();
+        if (manager != null) {
+            manager.purge();
+        }
+        binManager.set(null);
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+        RecordBinManager manager = binManager.get();
+        while (manager == null) {
+            manager = new RecordBinManager(context, sessionFactory, getLogger());
+            manager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+            final boolean updated = binManager.compareAndSet(null, manager);
+            if (!updated) {
+                manager = binManager.get();
+            }
+        }
+
+        final ProcessSession session = sessionFactory.createSession();
+        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+        if (getLogger().isDebugEnabled()) {
+            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
+        }
+
+        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+        final boolean block;
+        if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
+            block = true;
+        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+            block = true;
+        } else {
+            block = false;
+        }
+
+        try {
+            for (final FlowFile flowFile : flowFiles) {
+                try {
+                    binFlowFile(context, flowFile, session, manager, block);
+                } catch (final Exception e) {
+                    getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+            }
+        } finally {
+            session.commit();
+        }
+
+        try {
+            manager.completeExpiredBins();
+        } catch (final Exception e) {
+            getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+        }
+
+        if (flowFiles.isEmpty()) {
+            getLogger().debug("No FlowFiles to bin; will yield");
+            context.yield();
+        }
+    }
+
+
+    private void binFlowFile(final ProcessContext context, final FlowFile flowFile, final ProcessSession session, final RecordBinManager binManager, final boolean block) {
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        try (final InputStream in = session.read(flowFile);
+            final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            final RecordSchema schema = reader.getSchema();
+
+            final String groupId = getGroupId(context, flowFile, schema, session);
+            getLogger().debug("Got Group ID {} for {}", new Object[] {groupId, flowFile});
+
+            binManager.add(groupId, flowFile, reader, session, block);
+        } catch (MalformedRecordException | IOException | SchemaNotFoundException e) {
+            throw new ProcessException(e);
+        }
+    }
+
+
+    protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final RecordSchema schema, final ProcessSession session) {
+        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+        if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) {
+            return flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
+        }
+
+        final Optional<String> optionalText = schema.getSchemaText();
+        final String schemaText = optionalText.isPresent() ? optionalText.get() : AvroTypeUtil.extractAvroSchema(schema).toString();
+
+        final String groupId;
+        final String correlationshipAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
+        if (correlationshipAttributeName != null) {
+            final String correlationAttr = flowFile.getAttribute(correlationshipAttributeName);
+            groupId = correlationAttr == null ? schemaText : schemaText + correlationAttr;
+        } else {
+            groupId = schemaText;
+        }
+
+        return groupId;
+    }
+
+    int getBinCount() {
+        return binManager.get().getBinCount();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java
new file mode 100644
index 0000000..e3c9da9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public interface AttributeStrategy {
+    Map<String, String> getMergedAttributes(List<FlowFile> flowFiles);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java
new file mode 100644
index 0000000..221eb04
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+
+public class AttributeStrategyUtil {
+
+    public static final AllowableValue ATTRIBUTE_STRATEGY_ALL_COMMON = new AllowableValue("Keep Only Common Attributes", "Keep Only Common Attributes",
+        "Any attribute that is not the same on all FlowFiles in a bin will be dropped. Those that are the same across all FlowFiles will be retained.");
+    public static final AllowableValue ATTRIBUTE_STRATEGY_ALL_UNIQUE = new AllowableValue("Keep All Unique Attributes", "Keep All Unique Attributes",
+        "Any attribute that has the same value for all FlowFiles in a bin, or has no value for a FlowFile, will be kept. For example, if a bin consists of 3 FlowFiles "
+            + "and 2 of them have a value of 'hello' for the 'greeting' attribute and the third FlowFile has no 'greeting' attribute then the outbound FlowFile will get "
+            + "a 'greeting' attribute with the value 'hello'.");
+
+    public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder()
+        .required(true)
+        .name("Attribute Strategy")
+        .description("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any "
+            + "attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. "
+            + "If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same "
+            + "value, will be preserved.")
+        .allowableValues(ATTRIBUTE_STRATEGY_ALL_COMMON, ATTRIBUTE_STRATEGY_ALL_UNIQUE)
+        .defaultValue(ATTRIBUTE_STRATEGY_ALL_COMMON.getValue())
+        .build();
+
+
+    public static AttributeStrategy strategyFor(ProcessContext context) {
+        final String strategyName = context.getProperty(ATTRIBUTE_STRATEGY).getValue();
+        if (ATTRIBUTE_STRATEGY_ALL_UNIQUE.getValue().equals(strategyName)) {
+            return new KeepUniqueAttributeStrategy();
+        }
+        if (ATTRIBUTE_STRATEGY_ALL_COMMON.getValue().equals(strategyName)) {
+            return new KeepCommonAttributeStrategy();
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java
new file mode 100644
index 0000000..5e7920a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+
+public class KeepCommonAttributeStrategy implements AttributeStrategy {
+
+    @Override
+    public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
+        final Map<String, String> result = new HashMap<>();
+
+        //trivial cases
+        if (flowFiles == null || flowFiles.isEmpty()) {
+            return result;
+        } else if (flowFiles.size() == 1) {
+            result.putAll(flowFiles.iterator().next().getAttributes());
+        }
+
+        /*
+         * Start with the first attribute map and only put an entry to the
+         * resultant map if it is common to every map.
+         */
+        final Map<String, String> firstMap = flowFiles.iterator().next().getAttributes();
+
+        outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+            final String key = mapEntry.getKey();
+            final String value = mapEntry.getValue();
+
+            for (final FlowFile flowFile : flowFiles) {
+                final Map<String, String> currMap = flowFile.getAttributes();
+                final String curVal = currMap.get(key);
+                if (curVal == null || !curVal.equals(value)) {
+                    continue outer;
+                }
+            }
+            result.put(key, value);
+        }
+
+        // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
+        result.remove(CoreAttributes.UUID.key());
+        return result;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java
new file mode 100644
index 0000000..86fb198
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+
+public class KeepUniqueAttributeStrategy implements AttributeStrategy {
+
+    @Override
+    public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) {
+        final Map<String, String> newAttributes = new HashMap<>();
+        final Set<String> conflicting = new HashSet<>();
+
+        for (final FlowFile flowFile : flowFiles) {
+            for (final Map.Entry<String, String> attributeEntry : flowFile.getAttributes().entrySet()) {
+                final String name = attributeEntry.getKey();
+                final String value = attributeEntry.getValue();
+
+                final String existingValue = newAttributes.get(name);
+                if (existingValue != null && !existingValue.equals(value)) {
+                    conflicting.add(name);
+                } else {
+                    newAttributes.put(name, value);
+                }
+            }
+        }
+
+        for (final String attributeToRemove : conflicting) {
+            newAttributes.remove(attributeToRemove);
+        }
+
+        // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent.
+        newAttributes.remove(CoreAttributes.UUID.key());
+        return newAttributes;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
new file mode 100644
index 0000000..0b4cf31
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.standard.MergeRecord;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+
+public class RecordBin {
+    public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
+    public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
+
+    private final ComponentLog logger;
+    private final ProcessSession session;
+    private final RecordSetWriterFactory writerFactory;
+    private final RecordBinThresholds thresholds;
+    private final ProcessContext context;
+
+    private final List<FlowFile> flowFiles = new ArrayList<>();
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock();
+    private final Lock writeLock = rwLock.writeLock();
+    private final long creationNanos = System.nanoTime();
+
+    private FlowFile merged;
+    private RecordSetWriter recordWriter;
+    private ByteCountingOutputStream out;
+    private int recordCount = 0;
+    private volatile boolean complete = false;
+
+    private static final AtomicLong idGenerator = new AtomicLong(0L);
+    private final long id = idGenerator.getAndIncrement();
+
+
+    public RecordBin(final ProcessContext context, final ProcessSession session, final ComponentLog logger, final RecordBinThresholds thresholds) {
+        this.session = session;
+        this.writerFactory = context.getProperty(MergeRecord.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        this.logger = logger;
+        this.context = context;
+
+        this.merged = session.create();
+        this.thresholds = thresholds;
+    }
+
+    public boolean isOlderThan(final RecordBin other) {
+        return creationNanos < other.creationNanos;
+    }
+
+    public boolean isOlderThan(final long period, final TimeUnit unit) {
+        final long nanos = unit.toNanos(period);
+        return creationNanos < System.nanoTime() - nanos;
+    }
+
+    public boolean isComplete() {
+        return complete;
+    }
+
+    public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block)
+        throws IOException, MalformedRecordException, SchemaNotFoundException {
+
+        if (isComplete()) {
+            logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
+            return false;
+        }
+
+        final boolean locked;
+        if (block) {
+            writeLock.lock();
+            locked = true;
+        } else {
+            locked = writeLock.tryLock();
+        }
+
+        if (!locked) {
+            logger.debug("RecordBin.offer for id={} returning false because failed to get lock for {}", new Object[] {flowFile.getId(), this});
+            return false;
+        }
+
+        boolean flowFileMigrated = false;
+
+        try {
+            if (isComplete()) {
+                logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
+                return false;
+            }
+
+            logger.debug("Migrating id={} to {}", new Object[] {flowFile.getId(), this});
+
+            Record record;
+            while ((record = recordReader.nextRecord()) != null) {
+                if (recordWriter == null) {
+                    final OutputStream rawOut = session.write(merged);
+                    logger.debug("Created OutputStream using session {} for {}", new Object[] {session, this});
+
+                    this.out = new ByteCountingOutputStream(rawOut);
+
+                    recordWriter = writerFactory.createWriter(logger, record.getSchema(), flowFile, out);
+                    recordWriter.beginRecordSet();
+                }
+
+                recordWriter.write(record);
+                recordCount++;
+            }
+
+            // This will be closed by the MergeRecord class anyway but we have to close it
+            // here because it needs to be closed before we are able to migrate the FlowFile
+            // to a new Session.
+            recordReader.close();
+            flowFileSession.migrate(this.session, Collections.singleton(flowFile));
+            flowFileMigrated = true;
+            this.flowFiles.add(flowFile);
+
+            if (isFull()) {
+                logger.debug(this + " is now full. Completing bin.");
+                complete("Bin is full");
+            } else if (isOlderThan(thresholds.getMaxBinMillis(), TimeUnit.MILLISECONDS)) {
+                logger.debug(this + " is now expired. Completing bin.");
+                complete("Bin is older than " + thresholds.getMaxBinAge());
+            }
+
+            return true;
+        } catch (final Exception e) {
+            logger.error("Failed to create merged FlowFile from " + (flowFiles.size() + 1) + " input FlowFiles; routing originals to failure", e);
+
+            try {
+                // This will be closed by the MergeRecord class anyway but we have to close it
+                // here because it needs to be closed before we are able to migrate the FlowFile
+                // to a new Session.
+                recordReader.close();
+
+                if (recordWriter != null) {
+                    recordWriter.close();
+                }
+                if (this.out != null) {
+                    this.out.close();
+                }
+
+                if (!flowFileMigrated) {
+                    flowFileSession.migrate(this.session, Collections.singleton(flowFile));
+                    this.flowFiles.add(flowFile);
+                }
+            } finally {
+                complete = true;
+                session.remove(merged);
+                session.transfer(flowFiles, MergeRecord.REL_FAILURE);
+                session.commit();
+            }
+
+            return true;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public boolean isFull() {
+        readLock.lock();
+        try {
+            if (!isFullEnough()) {
+                return false;
+            }
+
+            int maxRecords;
+            final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
+            if (recordCountAttribute.isPresent()) {
+                final Optional<String> recordCountValue = flowFiles.stream()
+                    .filter(ff -> ff.getAttribute(recordCountAttribute.get()) != null)
+                    .map(ff -> ff.getAttribute(recordCountAttribute.get()))
+                    .findFirst();
+
+                if (!recordCountValue.isPresent()) {
+                    return false;
+                }
+
+                try {
+                    maxRecords = Integer.parseInt(recordCountValue.get());
+                } catch (final NumberFormatException e) {
+                    maxRecords = 1;
+                }
+            } else {
+                maxRecords = thresholds.getMaxRecords();
+            }
+
+            if (recordCount >= maxRecords) {
+                return true;
+            }
+
+            if (out.getBytesWritten() >= thresholds.getMaxBytes()) {
+                return true;
+            }
+
+            return false;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public boolean isFullEnough() {
+        readLock.lock();
+        try {
+            if (flowFiles.isEmpty()) {
+                return false;
+            }
+
+            int requiredRecordCount;
+            final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
+            if (recordCountAttribute.isPresent()) {
+                final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
+                try {
+                    requiredRecordCount = Integer.parseInt(recordCountValue);
+                } catch (final NumberFormatException e) {
+                    requiredRecordCount = 1;
+                }
+            } else {
+                requiredRecordCount = thresholds.getMinRecords();
+            }
+
+            return (recordCount >= requiredRecordCount && out.getBytesWritten() >= thresholds.getMinBytes());
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+
+    public void rollback() {
+        complete = true;
+        logger.debug("Marked {} as complete because rollback() was called", new Object[] {this});
+
+        writeLock.lock();
+        try {
+            if (recordWriter != null) {
+                try {
+                    recordWriter.close();
+                } catch (IOException e) {
+                    logger.warn("Failed to close Record Writer", e);
+                }
+            }
+
+            session.rollback();
+
+            if (logger.isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> " id=" + ff.getId() + ",").collect(Collectors.toList());
+                logger.debug("Rolled back bin {} containing input FlowFiles {}", new Object[] {this, ids});
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    private long getBinAge() {
+        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - creationNanos);
+    }
+
+    private void fail() {
+        complete = true;
+        logger.debug("Marked {} as complete because fail() was called", new Object[] {this});
+
+        writeLock.lock();
+        try {
+            if (recordWriter != null) {
+                try {
+                    recordWriter.close();
+                } catch (IOException e) {
+                    logger.warn("Failed to close Record Writer", e);
+                }
+            }
+
+            session.remove(merged);
+            session.transfer(flowFiles, MergeRecord.REL_FAILURE);
+            session.commit();
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public void complete(final String completionReason) throws IOException {
+        writeLock.lock();
+        try {
+            if (isComplete()) {
+                logger.debug("Cannot complete {} because it is already completed", new Object[] {this});
+                return;
+            }
+
+            complete = true;
+            logger.debug("Marked {} as complete because complete() was called", new Object[] {this});
+
+            final WriteResult writeResult = recordWriter.finishRecordSet();
+            recordWriter.close();
+            logger.debug("Closed Record Writer using session {} for {}", new Object[] {session, this});
+
+            if (flowFiles.isEmpty()) {
+                session.remove(merged);
+                return;
+            }
+
+            // If using defragment mode, and we don't have enough FlowFiles, then we need to fail this bin.
+            final Optional<String> countAttr = thresholds.getRecordCountAttribute();
+            if (countAttr.isPresent()) {
+                // Ensure that at least one FlowFile has a fragment.count attribute and that they all have the same value, if they have a value.
+                Integer expectedBinCount = null;
+                for (final FlowFile flowFile : flowFiles) {
+                    final String countVal = flowFile.getAttribute(countAttr.get());
+                    if (countVal == null) {
+                        continue;
+                    }
+
+                    final int count;
+                    try {
+                        count = Integer.parseInt(countVal);
+                    } catch (final NumberFormatException nfe) {
+                        logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but expected a number",
+                            new Object[] {flowFiles.size(), countAttr.get(), countVal, flowFile});
+                        fail();
+                        return;
+                    }
+
+                    if (expectedBinCount != null && count != expectedBinCount) {
+                        logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but another FlowFile in the bin had a value of {}",
+                            new Object[] {flowFiles.size(), countAttr.get(), countVal, flowFile, expectedBinCount});
+                        fail();
+                        return;
+                    }
+
+                    expectedBinCount = count;
+                }
+
+                if (expectedBinCount == null) {
+                    logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute was not present on any of the FlowFiles",
+                        new Object[] {flowFiles.size(), countAttr.get()});
+                    fail();
+                    return;
+                }
+
+                if (expectedBinCount != flowFiles.size()) {
+                    logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' but only {} of {} FlowFiles were encountered before this bin was evicted "
+                        + "(due to to Max Bin Age being reached or due to the Maximum Number of Bins being exceeded).",
+                        new Object[] {flowFiles.size(), countAttr.get(), expectedBinCount, flowFiles.size(), expectedBinCount});
+                    fail();
+                    return;
+                }
+            }
+
+            final Map<String, String> attributes = new HashMap<>();
+
+            final AttributeStrategy attributeStrategy = AttributeStrategyUtil.strategyFor(context);
+            final Map<String, String> mergedAttributes = attributeStrategy.getMergedAttributes(flowFiles);
+            attributes.putAll(mergedAttributes);
+
+            attributes.putAll(writeResult.getAttributes());
+            attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+            attributes.put(CoreAttributes.MIME_TYPE.key(), recordWriter.getMimeType());
+            attributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size()));
+            attributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge()));
+
+            merged = session.putAllAttributes(merged, attributes);
+
+            session.getProvenanceReporter().join(flowFiles, merged, "Records Merged due to: " + completionReason);
+            session.transfer(merged, MergeRecord.REL_MERGED);
+            session.transfer(flowFiles, MergeRecord.REL_ORIGINAL);
+            session.adjustCounter("Records Merged", writeResult.getRecordCount(), false);
+            session.commit();
+
+            if (logger.isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                logger.debug("Completed bin {} with {} records with Merged FlowFile {} using input FlowFiles {}", new Object[] {this, writeResult.getRecordCount(), merged, ids});
+            }
+        } catch (final Exception e) {
+            session.rollback(true);
+            throw e;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public String toString() {
+        readLock.lock();
+        try {
+            return "RecordBin[size=" + flowFiles.size() + ", full=" + isFull() + ", isComplete=" + isComplete() + ", id=" + id + "]";
+        } finally {
+            readLock.unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
new file mode 100644
index 0000000..8496a4d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.merge;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.MergeContent;
+import org.apache.nifi.processors.standard.MergeRecord;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+
+public class RecordBinManager {
+
+    private final ProcessContext context;
+    private final ProcessSessionFactory sessionFactory;
+    private final ComponentLog logger;
+    private final int maxBinCount;
+
+    private final AtomicLong maxBinAgeNanos = new AtomicLong(Long.MAX_VALUE);
+    private final Map<String, List<RecordBin>> groupBinMap = new HashMap<>(); // guarded by lock
+    private final Lock lock = new ReentrantLock();
+
+    private final AtomicInteger binCount = new AtomicInteger(0);
+
+    public RecordBinManager(final ProcessContext context, final ProcessSessionFactory sessionFactory, final ComponentLog logger) {
+        this.context = context;
+        this.sessionFactory = sessionFactory;
+        this.logger = logger;
+
+        final Integer maxBins = context.getProperty(MergeRecord.MAX_BIN_COUNT).asInteger();
+        this.maxBinCount = maxBins == null ? Integer.MAX_VALUE : maxBins.intValue();
+    }
+
+    /**
+     * Must be called only when there are no active threads modifying the bins.
+     */
+    public void purge() {
+        lock.lock();
+        try {
+            for (final List<RecordBin> binList : groupBinMap.values()) {
+                for (final RecordBin bin : binList) {
+                    bin.rollback();
+                }
+            }
+            groupBinMap.clear();
+            binCount.set(0);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public void setMaxBinAge(final Long timePeriod, final TimeUnit timeUnit) {
+        if (timePeriod == null) {
+            maxBinAgeNanos.set(Long.MAX_VALUE);
+        } else {
+            maxBinAgeNanos.set(timeUnit.toNanos(timePeriod));
+        }
+    }
+
+
+    public int getBinCount() {
+        return binCount.get();
+    }
+
+    /**
+     * Adds the given flowFiles to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary.
+     * <p/>
+     *
+     * @param groupIdentifier the group to which the flow file belongs; can be null
+     * @param flowFile flowFile to bin
+     * @param reader RecordReader to use for reading FlowFile
+     * @param session the ProcessSession to which the FlowFiles belong
+     * @param block if another thread is already writing to the desired bin, passing <code>true</code> for this parameter will block until the other thread(s) have finished so
+     *            that the records can still be added to the desired bin. Passing <code>false</code> will result in moving on to another bin.
+     *
+     * @throws SchemaNotFoundException if unable to find the schema for the record writer
+     * @throws MalformedRecordException if unable to read a record
+     * @throws IOException if there is an IO problem reading from the stream or writing to the stream
+     */
+    public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block)
+        throws IOException, MalformedRecordException, SchemaNotFoundException {
+
+        final List<RecordBin> currentBins;
+        lock.lock();
+        try {
+            // Create a new List<RecordBin> if none exists for this Group ID. We use a CopyOnWriteArrayList here because
+            // we need to traverse the list in a couple of places and just below here, we call bin.offer() (which is very expensive)
+            // while traversing the List, so we don't want to do this within a synchronized block. If we end up seeing poor performance
+            // from this, we could look at instead using a Synchronized List and instead of calling bin.offer() while iterating allow for some
+            // sort of bin.tryLock() and have that lock only if the flowfile should be added. Then if it returns true, we can stop iterating
+            // and perform the expensive part and then ensure that we always unlock
+            currentBins = groupBinMap.computeIfAbsent(groupIdentifier, grpId -> new CopyOnWriteArrayList<>());
+        } finally {
+            lock.unlock();
+        }
+
+        RecordBin acceptedBin = null;
+        for (final RecordBin bin : currentBins) {
+            final boolean accepted = bin.offer(flowFile, reader, session, block);
+
+            if (accepted) {
+                acceptedBin = bin;
+                logger.debug("Transferred id={} to {}", new Object[] {flowFile.getId(), bin});
+                break;
+            }
+        }
+
+        // We have to do this outside of our for-loop above in order to avoid a concurrent modification Exception.
+        if (acceptedBin != null) {
+            if (acceptedBin.isComplete()) {
+                removeBins(groupIdentifier, Collections.singletonList(acceptedBin));
+            }
+
+            return;
+        }
+
+        // if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one
+        final RecordBin bin = new RecordBin(context, sessionFactory.createSession(), logger, createThresholds());
+        final boolean binAccepted = bin.offer(flowFile, reader, session, true);
+        if (!binAccepted) {
+            session.rollback();
+            throw new RuntimeException("Attempted to add " + flowFile + " to a new bin but failed. This is unexpected. Will roll back session and try again.");
+        }
+
+        logger.debug("Transferred id={} to {}", new Object[] {flowFile.getId(), bin});
+
+        if (!bin.isComplete()) {
+            final int updatedBinCount = binCount.incrementAndGet();
+
+            lock.lock();
+            try {
+                // We have already obtained the list of RecordBins from this Map above. However, we released
+                // the lock in order to avoid blocking while writing to a Bin. Because of this, it is possible
+                // that another thread may have already come in and removed this List from the Map, if all
+                // Bins in the List have been completed. As a result, we must now obtain the write lock again
+                // and obtain the List (or a new one), and then update that. This ensures that we never lose
+                // track of a Bin. If we don't lose this, we could completely lose a Bin.
+                final List<RecordBin> bins = groupBinMap.computeIfAbsent(groupIdentifier, grpId -> new CopyOnWriteArrayList<>());
+                bins.add(bin);
+            } finally {
+                lock.unlock();
+            }
+
+            if (updatedBinCount > maxBinCount) {
+                completeOldestBin();
+            }
+        }
+    }
+
+
+    private RecordBinThresholds createThresholds() {
+        final int minRecords = context.getProperty(MergeRecord.MIN_RECORDS).asInteger();
+        final int maxRecords = context.getProperty(MergeRecord.MAX_RECORDS).asInteger();
+        final long minBytes = context.getProperty(MergeRecord.MIN_SIZE).asDataSize(DataUnit.B).longValue();
+
+        final PropertyValue maxSizeValue = context.getProperty(MergeRecord.MAX_SIZE);
+        final long maxBytes = maxSizeValue.isSet() ? maxSizeValue.asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
+
+        final PropertyValue maxMillisValue = context.getProperty(MergeRecord.MAX_BIN_AGE);
+        final String maxBinAge = maxMillisValue.getValue();
+        final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS).longValue() : Long.MAX_VALUE;
+
+        final String recordCountAttribute;
+        final String mergeStrategy = context.getProperty(MergeRecord.MERGE_STRATEGY).getValue();
+        if (MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+            recordCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
+        } else {
+            recordCountAttribute = null;
+        }
+
+        return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, recordCountAttribute);
+    }
+
+
+    public void completeOldestBin() throws IOException {
+        RecordBin oldestBin = null;
+
+        lock.lock();
+        try {
+            String oldestBinGroup = null;
+
+            for (final Map.Entry<String, List<RecordBin>> group : groupBinMap.entrySet()) {
+                for (final RecordBin bin : group.getValue()) {
+                    if (oldestBin == null || bin.isOlderThan(oldestBin)) {
+                        oldestBin = bin;
+                        oldestBinGroup = group.getKey();
+                    }
+                }
+            }
+
+            if (oldestBin == null) {
+                return;
+            }
+
+            removeBins(oldestBinGroup, Collections.singletonList(oldestBin));
+        } finally {
+            lock.unlock();
+        }
+
+        logger.debug("Completing Bin " + oldestBin + " because the maximum number of bins has been exceeded");
+        oldestBin.complete("Maximum number of bins has been exceeded");
+    }
+
+
+    public void completeExpiredBins() throws IOException {
+        final long maxNanos = maxBinAgeNanos.get();
+        final Map<String, List<RecordBin>> expiredBinMap = new HashMap<>();
+
+        lock.lock();
+        try {
+            for (final Map.Entry<String, List<RecordBin>> entry : groupBinMap.entrySet()) {
+                final String key = entry.getKey();
+                final List<RecordBin> bins = entry.getValue();
+
+                for (final RecordBin bin : bins) {
+                    if (bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS)) {
+                        final List<RecordBin> expiredBinsForKey = expiredBinMap.computeIfAbsent(key, ignore -> new ArrayList<>());
+                        expiredBinsForKey.add(bin);
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+
+        for (final Map.Entry<String, List<RecordBin>> entry : expiredBinMap.entrySet()) {
+            final String key = entry.getKey();
+            final List<RecordBin> expiredBins = entry.getValue();
+
+            for (final RecordBin bin : expiredBins) {
+                logger.debug("Completing Bin {} because it has expired");
+                bin.complete("Bin has reached Max Bin Age");
+            }
+
+            removeBins(key, expiredBins);
+        }
+    }
+
+    private void removeBins(final String key, final List<RecordBin> bins) {
+        lock.lock();
+        try {
+            final List<RecordBin> list = groupBinMap.get(key);
+            if (list != null) {
+                final int initialSize = list.size();
+                list.removeAll(bins);
+
+                // Determine how many items were removed from the list and
+                // update our binCount to keep track of this.
+                final int removedCount = initialSize - list.size();
+                binCount.addAndGet(-removedCount);
+
+                if (list.isEmpty()) {
+                    groupBinMap.remove(key);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+}