You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/10/09 13:20:00 UTC

[2/2] nifi git commit: NIFI-5533: Be more efficient with heap utilization - Updated FlowFile Repo / Write Ahead Log so that any update that writes more than 1 MB of data is written to a file inside the FlowFile Repo rather than being buffered in memory

NIFI-5533: Be more efficient with heap utilization
 - Updated FlowFile Repo / Write Ahead Log so that any update that writes more than 1 MB of data is written to a file inside the FlowFile Repo rather than being buffered in memory
 - Update SplitText so that it does not hold FlowFiles that are not the latest version in heap. Doing them from being garbage collected, so while the Process Session is holding the latest version of the FlowFile, SplitText is holding an older version, and this results in two copies of the same FlowFile object

NIFI-5533: Checkpoint

NIFI-5533: Bug Fixes

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2974


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c425bd28
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c425bd28
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c425bd28

Branch: refs/heads/master
Commit: c425bd2880dc2c45c96e0dfcc4990f1e20e14d0a
Parents: c87d791
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Aug 17 14:08:14 2018 -0400
Committer: Matthew Burgess <ma...@apache.org>
Committed: Tue Oct 9 09:18:02 2018 -0400

----------------------------------------------------------------------
 .../language/StandardPreparedQuery.java         |  21 +-
 .../nifi/repository/schema/RecordIterator.java  |  28 +++
 .../repository/schema/SchemaRecordReader.java   |  68 +++++-
 .../repository/schema/SchemaRecordWriter.java   |   9 +-
 .../repository/schema/SingleRecordIterator.java |  45 ++++
 .../nifi/wali/LengthDelimitedJournal.java       | 119 ++++++++++-
 .../wali/SequentialAccessWriteAheadLog.java     |   6 +-
 .../org/apache/nifi/wali/WriteAheadJournal.java |   5 +
 .../org/wali/MinimalLockingWriteAheadLog.java   |   3 +
 .../src/main/java/org/wali/SerDe.java           |  34 +++
 .../java/org/wali/WriteAheadRepository.java     |   2 +-
 .../wali/TestSequentialAccessWriteAheadLog.java |  89 ++++++--
 .../test/java/org/wali/DummyRecordSerde.java    |  83 ++++++++
 .../WriteAvroSchemaAttributeStrategy.java       |  22 +-
 .../repository/SchemaRepositoryRecordSerde.java |  71 ++++++-
 .../repository/FileSystemRepository.java        |   2 +-
 .../repository/StandardProcessSession.java      | 210 ++++++++++---------
 .../nifi/processor/StandardProcessContext.java  |  36 +++-
 .../repository/StandardRepositoryRecord.java    |  45 +++-
 .../nifi/processors/standard/SplitText.java     |  87 ++++----
 .../processors/attributes/UpdateAttribute.java  |  69 ++++--
 21 files changed, 828 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java
index cf90d8d..fc3f9b7 100644
--- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java
+++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java
@@ -16,12 +16,6 @@
  */
 package org.apache.nifi.attribute.expression.language;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.nifi.attribute.expression.language.evaluation.Evaluator;
 import org.apache.nifi.attribute.expression.language.evaluation.literals.StringLiteralEvaluator;
 import org.apache.nifi.attribute.expression.language.evaluation.selection.AllAttributesEvaluator;
@@ -34,7 +28,14 @@ import org.apache.nifi.attribute.expression.language.evaluation.selection.MultiN
 import org.apache.nifi.expression.AttributeValueDecorator;
 import org.apache.nifi.processor.exception.ProcessException;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public class StandardPreparedQuery implements PreparedQuery {
+    private static final String EMPTY_STRING = "";
 
     private final List<Expression> expressions;
     private volatile VariableImpact variableImpact;
@@ -45,6 +46,14 @@ public class StandardPreparedQuery implements PreparedQuery {
 
     @Override
     public String evaluateExpressions(final Map<String, String> valMap, final AttributeValueDecorator decorator, final Map<String, String> stateVariables) throws ProcessException {
+        if (expressions.isEmpty()) {
+            return EMPTY_STRING;
+        }
+        if (expressions.size() == 1) {
+            final String evaluated = expressions.get(0).evaluate(valMap, decorator, stateVariables);
+            return evaluated == null ? EMPTY_STRING : evaluated;
+        }
+
         final StringBuilder sb = new StringBuilder();
 
         for (final Expression expression : expressions) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordIterator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordIterator.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordIterator.java
new file mode 100644
index 0000000..de35cd5
--- /dev/null
+++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordIterator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.repository.schema;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface RecordIterator extends Closeable {
+
+    Record next() throws IOException;
+
+    boolean isNext() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
index 84f3532..daedf37 100644
--- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
+++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
@@ -17,8 +17,11 @@
 
 package org.apache.nifi.repository.schema;
 
+import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -30,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-
 public class SchemaRecordReader {
     private final RecordSchema schema;
 
@@ -56,15 +58,24 @@ public class SchemaRecordReader {
     }
 
     public Record readRecord(final InputStream in) throws IOException {
-        final int sentinelByte = in.read();
-        if (sentinelByte < 0) {
+        final int recordIndicator = in.read();
+        if (recordIndicator < 0) {
             return null;
         }
 
-        if (sentinelByte != 1) {
-            throw new IOException("Expected to read a Sentinel Byte of '1' but got a value of '" + sentinelByte + "' instead");
+        if (recordIndicator == SchemaRecordWriter.EXTERNAL_FILE_INDICATOR) {
+            throw new IOException("Expected to read a Sentinel Byte of '1' indicating that the next record is inline but the Sentinel value was '" + SchemaRecordWriter.EXTERNAL_FILE_INDICATOR
+                + ", indicating that data was written to an External File. This data cannot be recovered via calls to #readRecord(InputStream) but must be recovered via #readRecords(InputStream)");
+        }
+
+        if (recordIndicator != 1) {
+            throw new IOException("Expected to read a Sentinel Byte of '1' but got a value of '" + recordIndicator + "' instead");
         }
 
+        return readInlineRecord(in);
+    }
+
+    private Record readInlineRecord(final InputStream in) throws IOException {
         final List<RecordField> schemaFields = schema.getFields();
         final Map<RecordField, Object> fields = new HashMap<>(schemaFields.size());
 
@@ -76,6 +87,53 @@ public class SchemaRecordReader {
         return new FieldMapRecord(fields, schema);
     }
 
+    public RecordIterator readRecords(final InputStream in) throws IOException {
+        final int recordIndicator = in.read();
+        if (recordIndicator < 0) {
+            return null;
+        }
+
+        if (recordIndicator == SchemaRecordWriter.INLINE_RECORD_INDICATOR) {
+            final Record nextRecord = readInlineRecord(in);
+            return new SingleRecordIterator(nextRecord);
+        }
+
+        if (recordIndicator != SchemaRecordWriter.EXTERNAL_FILE_INDICATOR) {
+            throw new IOException("Expected to read a Sentinel Byte of '" + SchemaRecordWriter.INLINE_RECORD_INDICATOR + "' or '" + SchemaRecordWriter.EXTERNAL_FILE_INDICATOR
+                + "' but encountered a value of '" + recordIndicator + "' instead");
+        }
+
+        final DataInputStream dis = new DataInputStream(in);
+        final String externalFilename = dis.readUTF();
+        final File externalFile = new File(externalFilename);
+        final FileInputStream fis = new FileInputStream(externalFile);
+        final InputStream bufferedIn = new BufferedInputStream(fis);
+
+        final RecordIterator recordIterator = new RecordIterator() {
+            @Override
+            public Record next() throws IOException {
+                return readRecord(bufferedIn);
+            }
+
+            @Override
+            public boolean isNext() throws IOException {
+                bufferedIn.mark(1);
+                final int nextByte = bufferedIn.read();
+                bufferedIn.reset();
+
+                return (nextByte > -1);
+            }
+
+            @Override
+            public void close() throws IOException {
+                bufferedIn.close();
+            }
+        };
+
+        return recordIterator;
+    }
+
+
 
     private Object readField(final InputStream in, final RecordField field) throws IOException {
         switch (field.getRepetition()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
index 67d558a..d65e60b 100644
--- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
+++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordWriter.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
@@ -30,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 
 public class SchemaRecordWriter {
+    static final int INLINE_RECORD_INDICATOR = 1;
+    static final int EXTERNAL_FILE_INDICATOR = 8;
 
     public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
 
@@ -41,7 +44,7 @@ public class SchemaRecordWriter {
         // write sentinel value to indicate that there is a record. This allows the reader to then read one
         // byte and check if -1. If so, the reader knows there are no more records. If not, then the reader
         // knows that it should be able to continue reading.
-        out.write(1);
+        out.write(INLINE_RECORD_INDICATOR);
 
         final byte[] buffer = byteArrayCache.checkOut();
         try {
@@ -226,4 +229,8 @@ public class SchemaRecordWriter {
         return charsInOriginal;
     }
 
+    public void writeExternalFileReference(final DataOutputStream out, final File externalFile) throws IOException {
+        out.write(EXTERNAL_FILE_INDICATOR);
+        out.writeUTF(externalFile.getAbsolutePath());
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SingleRecordIterator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SingleRecordIterator.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SingleRecordIterator.java
new file mode 100644
index 0000000..cc007fc
--- /dev/null
+++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SingleRecordIterator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.repository.schema;
+
+public class SingleRecordIterator implements RecordIterator {
+    private final Record record;
+    private boolean consumed = false;
+
+    public SingleRecordIterator(final Record record) {
+        this.record = record;
+    }
+
+    @Override
+    public Record next() {
+        if (consumed) {
+            return null;
+        }
+
+        consumed = true;
+        return record;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public boolean isNext() {
+        return !consumed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
index c10d366..d9fdc97 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java
@@ -39,15 +39,19 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
 import java.text.DecimalFormat;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
     private static final Logger logger = LoggerFactory.getLogger(LengthDelimitedJournal.class);
+    private static final int DEFAULT_MAX_IN_HEAP_SERIALIZATION_BYTES = 5 * 1024 * 1024; // 5 MB
+
     private static final JournalSummary INACTIVE_JOURNAL_SUMMARY = new StandardJournalSummary(-1L, -1L, 0);
     private static final int JOURNAL_ENCODING_VERSION = 1;
     private static final byte TRANSACTION_FOLLOWS = 64;
@@ -55,9 +59,11 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
     private static final int NUL_BYTE = 0;
 
     private final File journalFile;
+    private final File overflowDirectory;
     private final long initialTransactionId;
     private final SerDeFactory<T> serdeFactory;
     private final ObjectPool<ByteArrayDataOutputStream> streamPool;
+    private final int maxInHeapSerializationBytes;
 
     private SerDe<T> serde;
     private FileOutputStream fileOut;
@@ -72,13 +78,56 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
     private final ByteBuffer transactionPreamble = ByteBuffer.allocate(12); // guarded by synchronized block
 
     public LengthDelimitedJournal(final File journalFile, final SerDeFactory<T> serdeFactory, final ObjectPool<ByteArrayDataOutputStream> streamPool, final long initialTransactionId) {
+        this(journalFile, serdeFactory, streamPool, initialTransactionId, DEFAULT_MAX_IN_HEAP_SERIALIZATION_BYTES);
+    }
+
+    public LengthDelimitedJournal(final File journalFile, final SerDeFactory<T> serdeFactory, final ObjectPool<ByteArrayDataOutputStream> streamPool, final long initialTransactionId,
+                                  final int maxInHeapSerializationBytes) {
         this.journalFile = journalFile;
+        this.overflowDirectory = new File(journalFile.getParentFile(), "overflow-" + getBaseFilename(journalFile));
         this.serdeFactory = serdeFactory;
         this.serde = serdeFactory.createSerDe(null);
         this.streamPool = streamPool;
 
         this.initialTransactionId = initialTransactionId;
         this.currentTransactionId = initialTransactionId;
+        this.maxInHeapSerializationBytes = maxInHeapSerializationBytes;
+    }
+
+    public void dispose() {
+        logger.debug("Deleting Journal {} because it is now encapsulated in the latest Snapshot", journalFile.getName());
+        if (!journalFile.delete() && journalFile.exists()) {
+            logger.warn("Unable to delete expired journal file " + journalFile + "; this file should be deleted manually.");
+        }
+
+        if (overflowDirectory.exists()) {
+            final File[] overflowFiles = overflowDirectory.listFiles();
+            if (overflowFiles == null) {
+                logger.warn("Unable to obtain listing of files that exist in 'overflow directory' " + overflowDirectory
+                    + " - this directory and any files within it can now be safely removed manually");
+                return;
+            }
+
+            for (final File overflowFile : overflowFiles) {
+                if (!overflowFile.delete() && overflowFile.exists()) {
+                    logger.warn("After expiring journal file " + journalFile + ", unable to remove 'overflow file' " + overflowFile + " - this file should be removed manually");
+                }
+            }
+
+            if (!overflowDirectory.delete()) {
+                logger.warn("After expiring journal file " + journalFile + ", unable to remove 'overflow directory' " + overflowDirectory + " - this file should be removed manually");
+            }
+        }
+    }
+
+    private static String getBaseFilename(final File file) {
+        final String name = file.getName();
+        final int index = name.lastIndexOf(".");
+        if (index < 0) {
+            return name;
+        }
+
+        return name.substring(0, index);
     }
 
     private synchronized OutputStream getOutputStream() throws FileNotFoundException {
@@ -181,12 +230,64 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
 
         checkState();
 
+        File overflowFile = null;
         final ByteArrayDataOutputStream bados = streamPool.borrowObject();
+
         try {
-            for (final T record : records) {
-                final Object recordId = serde.getRecordIdentifier(record);
-                final T previousRecordState = recordLookup.lookup(recordId);
-                serde.serializeEdit(previousRecordState, record, bados.getDataOutputStream());
+            FileOutputStream overflowFileOut = null;
+
+            try {
+                DataOutputStream dataOut = bados.getDataOutputStream();
+                for (final T record : records) {
+                    final Object recordId = serde.getRecordIdentifier(record);
+                    final T previousRecordState = recordLookup.lookup(recordId);
+                    serde.serializeEdit(previousRecordState, record, dataOut);
+
+                    final int size = bados.getByteArrayOutputStream().size();
+                    if (serde.isWriteExternalFileReferenceSupported() && size > maxInHeapSerializationBytes) {
+                        if (!overflowDirectory.exists()) {
+                            Files.createDirectory(overflowDirectory.toPath());
+                        }
+
+                        // If we have exceeded our threshold for how much to serialize in memory,
+                        // flush the in-memory representation to an 'overflow file' and then update
+                        // the Data Output Stream that is used to write to the file also.
+                        overflowFile = new File(overflowDirectory, UUID.randomUUID().toString());
+                        logger.debug("Length of update with {} records exceeds in-memory max of {} bytes. Overflowing to {}", records.size(), maxInHeapSerializationBytes, overflowFile);
+
+                        overflowFileOut = new FileOutputStream(overflowFile);
+                        bados.getByteArrayOutputStream().writeTo(overflowFileOut);
+                        bados.getByteArrayOutputStream().reset();
+
+                        // change dataOut to point to the File's Output Stream so that all subsequent records are written to the file.
+                        dataOut = new DataOutputStream(new BufferedOutputStream(overflowFileOut));
+
+                        // We now need to write to the ByteArrayOutputStream a pointer to the overflow file
+                        // so that what is written to the actual journal is that pointer.
+                        serde.writeExternalFileReference(overflowFile, bados.getDataOutputStream());
+                    }
+                }
+
+                dataOut.flush();
+
+                // If we overflowed to an external file, we need to be sure that we sync to disk before
+                // updating the Journal. Otherwise, we could get to a state where the Journal was flushed to disk without the
+                // external file being flushed. This would result in a missed update to the FlowFile Repository.
+                if (overflowFileOut != null) {
+                    if (logger.isDebugEnabled()) { // avoid calling File.length() if not necessary
+                        logger.debug("Length of update to overflow file is {} bytes", overflowFile.length());
+                    }
+
+                    overflowFileOut.getFD().sync();
+                }
+            } finally {
+                if (overflowFileOut != null) {
+                    try {
+                        overflowFileOut.close();
+                    } catch (final Exception e) {
+                        logger.warn("Failed to close open file handle to overflow file {}", overflowFile, e);
+                    }
+                }
             }
 
             final ByteArrayOutputStream baos = bados.getByteArrayOutputStream();
@@ -210,12 +311,20 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
             logger.debug("Wrote Transaction {} to journal {} with length {} and {} records", transactionId, journalFile, baos.size(), records.size());
         } catch (final Throwable t) {
             poison(t);
+
+            if (overflowFile != null) {
+                if (!overflowFile.delete() && overflowFile.exists()) {
+                    logger.warn("Failed to cleanup temporary overflow file " + overflowFile + " - this file should be cleaned up manually.");
+                }
+            }
+
             throw t;
         } finally {
             streamPool.returnObject(bados);
         }
     }
 
+
     private void checkState() throws IOException {
         if (poisoned) {
             throw new IOException("Cannot update journal file " + journalFile + " because this journal has already encountered a failure when attempting to write to the file. "
@@ -335,7 +444,7 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
                     final ByteCountingInputStream transactionByteCountingIn = new ByteCountingInputStream(transactionLimitingIn);
                     final DataInputStream transactionDis = new DataInputStream(transactionByteCountingIn);
 
-                    while (transactionByteCountingIn.getBytesConsumed() < transactionLength) {
+                    while (transactionByteCountingIn.getBytesConsumed() < transactionLength || serde.isMoreInExternalFile()) {
                         final T record = serde.deserializeEdit(transactionDis, recordMap, serdeAndVersion.getVersion());
 
                         // Update our RecordMap so that we have the most up-to-date version of the Record.

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
index cba5184..11eb31c 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
@@ -300,10 +300,8 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
         snapshot.writeSnapshot(snapshotCapture);
 
         for (final File existingJournal : existingJournals) {
-            logger.debug("Deleting Journal {} because it is now encapsulated in the latest Snapshot", existingJournal.getName());
-            if (!existingJournal.delete() && existingJournal.exists()) {
-                logger.warn("Unable to delete expired journal file " + existingJournal + "; this file should be deleted manually.");
-            }
+            final WriteAheadJournal journal = new LengthDelimitedJournal<>(existingJournal, serdeFactory, streamPool, nextTransactionId);
+            journal.dispose();
         }
 
         final long totalNanos = System.nanoTime() - startNanos;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
index c44e1cb..d4fb6cb 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadJournal.java
@@ -53,4 +53,9 @@ public interface WriteAheadJournal<T> extends Closeable {
      * @return <code>true</code> if the journal is healthy and can be written to, <code>false</code> if either the journal has been closed or is poisoned
      */
     boolean isHealthy();
+
+    /**
+     * Destroys any resources that the journal occupies
+     */
+    void dispose();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index eabac9d..8db2d87 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -1103,6 +1103,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
                 final S record;
                 try {
                     record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion);
+                    if (record == null) {
+                        throw new EOFException();
+                    }
                 } catch (final EOFException eof) {
                     throw eof;
                 } catch (final Exception e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
index d1919e7..356cf84 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
@@ -18,6 +18,7 @@ package org.wali;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 
@@ -151,4 +152,37 @@ public interface SerDe<T> {
      */
     default void close() throws IOException {
     }
+
+    /**
+     * Optional method that a SerDe can support that indicates that the contents of the next update should be found
+     * in the given external File.
+     *
+     * @param externalFile the file that contains the update information
+     * @param out the DataOutputStream to write the external file reference to
+     * @throws IOException if unable to write the update
+     * @throws UnsupportedOperationException if this SerDe does not support this operation
+     */
+    default void writeExternalFileReference(File externalFile, DataOutputStream out) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Indicates whether or not a call to {@link #writeExternalFileReference(File, DataOutputStream)} is valid for this implementation
+     * @return <code>true</code> if calls to {@link #writeExternalFileReference(File, DataOutputStream)} are supported, <code>false</code> if calling
+     * the method will result in an {@link UnsupportedOperationException} being thrown.
+     */
+    default boolean isWriteExternalFileReferenceSupported() {
+        return false;
+    }
+
+    /**
+     * If the last call to read data from this SerDe resulted in data being read from an External File, and there is more data in that External File,
+     * then this method will return <code>true</code>. Otherwise, it will return <code>false</code>.
+     *
+     * @return <code>true</code> if more data available in External File, <code>false</code> otherwise.
+     * @throws IOException if unable to read from External File to determine data availability
+     */
+    default boolean isMoreInExternalFile() throws IOException {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
index 7f0e828..05fc8a5 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java
@@ -89,7 +89,7 @@ public interface WriteAheadRepository<T> {
      * <p>
      * Recovers all External Swap locations that were persisted. If this method
      * is to be called, it must be called AFTER {@link #recoverRecords()} and
-     * BEFORE {@link update}.
+     * BEFORE {@link #update(Collection, boolean)}}.
      * </p>
      *
      * @return swap location

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java
index 4fc0fe7..6d24445 100644
--- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestSequentialAccessWriteAheadLog.java
@@ -17,10 +17,17 @@
 
 package org.apache.nifi.wali;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.wali.DummyRecord;
+import org.wali.DummyRecordSerde;
+import org.wali.SerDeFactory;
+import org.wali.SingletonSerDeFactory;
+import org.wali.UpdateType;
+import org.wali.WriteAheadRepository;
 
 import java.io.File;
 import java.io.IOException;
@@ -38,22 +45,69 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.wali.DummyRecord;
-import org.wali.DummyRecordSerde;
-import org.wali.SerDeFactory;
-import org.wali.SingletonSerDeFactory;
-import org.wali.UpdateType;
-import org.wali.WriteAheadRepository;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestSequentialAccessWriteAheadLog {
     @Rule
     public TestName testName = new TestName();
 
+
+    @Test
+    public void testUpdateWithExternalFile() throws IOException {
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo(serde);
+
+        final List<DummyRecord> records = new ArrayList<>();
+        for (int i = 0; i < 350_000; i++) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
+            records.add(record);
+        }
+
+        repo.update(records, false);
+        repo.shutdown();
+
+        assertEquals(1, serde.getExternalFileReferences().size());
+
+        final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
+        final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
+
+        // ensure that we get the same records back, but the order may be different, so wrap both collections
+        // in a HashSet so that we can compare unordered collections of the same type.
+        assertEquals(new HashSet<>(records), new HashSet<>(recovered));
+    }
+
+    @Test
+    public void testUpdateWithExternalFileFollowedByInlineUpdate() throws IOException {
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo(serde);
+
+        final List<DummyRecord> records = new ArrayList<>();
+        for (int i = 0; i < 350_000; i++) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
+            records.add(record);
+        }
+
+        repo.update(records, false);
+
+        final DummyRecord subsequentRecord = new DummyRecord("350001", UpdateType.CREATE);
+        repo.update(Collections.singleton(subsequentRecord), false);
+        repo.shutdown();
+
+        assertEquals(1, serde.getExternalFileReferences().size());
+
+        final SequentialAccessWriteAheadLog<DummyRecord> recoveryRepo = createRecoveryRepo();
+        final Collection<DummyRecord> recovered = recoveryRepo.recoverRecords();
+
+        // ensure that we get the same records back, but the order may be different, so wrap both collections
+        // in a HashSet so that we can compare unordered collections of the same type.
+        final Set<DummyRecord> expectedRecords = new HashSet<>(records);
+        expectedRecords.add(subsequentRecord);
+        assertEquals(expectedRecords, new HashSet<>(recovered));
+    }
+
     @Test
     public void testRecoverWithNoCheckpoint() throws IOException {
         final SequentialAccessWriteAheadLog<DummyRecord> repo = createWriteRepo();
@@ -145,12 +199,15 @@ public class TestSequentialAccessWriteAheadLog {
     }
 
     private SequentialAccessWriteAheadLog<DummyRecord> createWriteRepo() throws IOException {
+        return createWriteRepo(new DummyRecordSerde());
+    }
+
+    private SequentialAccessWriteAheadLog<DummyRecord> createWriteRepo(final DummyRecordSerde serde) throws IOException {
         final File targetDir = new File("target");
         final File storageDir = new File(targetDir, testName.getMethodName());
         deleteRecursively(storageDir);
         assertTrue(storageDir.mkdirs());
 
-        final DummyRecordSerde serde = new DummyRecordSerde();
         final SerDeFactory<DummyRecord> serdeFactory = new SingletonSerDeFactory<>(serde);
         final SequentialAccessWriteAheadLog<DummyRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
index 1f6aede..9203493 100644
--- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
+++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java
@@ -16,17 +16,31 @@
  */
 package org.wali;
 
+import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public class DummyRecordSerde implements SerDe<DummyRecord> {
+    private static final int INLINE_RECORD_INDICATOR = 1;
+    private static final int EXTERNAL_FILE_INDICATOR = 8;
 
     private int throwIOEAfterNserializeEdits = -1;
     private int throwOOMEAfterNserializeEdits = -1;
     private int serializeEditCount = 0;
 
+    private final Set<File> externalFilesWritten = new HashSet<>();
+    private Queue<DummyRecord> externalRecords;
+
     @SuppressWarnings("fallthrough")
     @Override
     public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException {
@@ -37,6 +51,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
             throw new OutOfMemoryError("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw OOME");
         }
 
+        out.write(INLINE_RECORD_INDICATOR);
         out.writeUTF(record.getUpdateType().name());
         out.writeUTF(record.getId());
 
@@ -72,6 +87,57 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
     @Override
     @SuppressWarnings("fallthrough")
     public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
+        if (externalRecords != null) {
+            final DummyRecord record = externalRecords.poll();
+            if (record != null) {
+                return record;
+            }
+
+            externalRecords = null;
+        }
+
+        final int recordLocationIndicator = in.read();
+        if (recordLocationIndicator == EXTERNAL_FILE_INDICATOR) {
+            final String externalFilename = in.readUTF();
+            final File externalFile = new File(externalFilename);
+
+            try (final InputStream fis = new FileInputStream(externalFile);
+                 final InputStream bufferedIn = new BufferedInputStream(fis);
+                 final DataInputStream dis = new DataInputStream(bufferedIn)) {
+
+                externalRecords = new LinkedBlockingQueue<>();
+
+                DummyRecord record;
+                while ((record = deserializeRecordInline(dis, version, true)) != null) {
+                    externalRecords.offer(record);
+                }
+
+                return externalRecords.poll();
+            }
+        } else if (recordLocationIndicator == INLINE_RECORD_INDICATOR) {
+            return deserializeRecordInline(in, version, false);
+        } else {
+            throw new IOException("Encountered invalid record location indicator: " + recordLocationIndicator);
+        }
+    }
+
+    @Override
+    public boolean isMoreInExternalFile() {
+        return externalRecords != null && !externalRecords.isEmpty();
+    }
+
+    private DummyRecord deserializeRecordInline(final DataInputStream in, final int version, final boolean expectInlineRecordIndicator) throws IOException {
+        if (expectInlineRecordIndicator) {
+            final int locationIndicator = in.read();
+            if (locationIndicator < 0) {
+                return null;
+            }
+
+            if (locationIndicator != INLINE_RECORD_INDICATOR) {
+                throw new IOException("Expected inline record indicator but encountered " + locationIndicator);
+            }
+        }
+
         final String updateTypeName = in.readUTF();
         final UpdateType updateType = UpdateType.valueOf(updateTypeName);
         final String id = in.readUTF();
@@ -135,4 +201,21 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
     public String getLocation(final DummyRecord record) {
         return record.getSwapLocation();
     }
+
+    @Override
+    public boolean isWriteExternalFileReferenceSupported() {
+        return true;
+    }
+
+    @Override
+    public void writeExternalFileReference(final File externalFile, final DataOutputStream out) throws IOException {
+        out.write(EXTERNAL_FILE_INDICATOR);
+        out.writeUTF(externalFile.getAbsolutePath());
+
+        externalFilesWritten.add(externalFile);
+    }
+
+    public Set<File> getExternalFileReferences() {
+        return Collections.unmodifiableSet(externalFilesWritten);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
index 5f94679..36484a5 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.serialization.record.RecordSchema;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collections;
@@ -26,10 +30,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
-import org.apache.avro.Schema;
-import org.apache.nifi.avro.AvroTypeUtil;
-import org.apache.nifi.serialization.record.RecordSchema;
-
 public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
     private final Map<RecordSchema, String> avroSchemaTextCache = new LinkedHashMap<RecordSchema, String>() {
         @Override
@@ -53,11 +53,21 @@ public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
             }
         }
 
-        String schemaText = avroSchemaTextCache.get(schema);
+        String schemaText;
+        synchronized (avroSchemaTextCache) {
+            schemaText = avroSchemaTextCache.get(schema);
+        }
+
         if (schemaText == null) {
             final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
             schemaText = avroSchema.toString();
-            avroSchemaTextCache.put(schema, schemaText);
+
+            synchronized (avroSchemaTextCache) {
+                final String existing = avroSchemaTextCache.putIfAbsent(schema, schemaText);
+                if (existing != null) {
+                    schemaText = existing;
+                }
+            }
         }
 
         return Collections.singletonMap("avro.schema", schemaText);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
index 970d45e..0013846 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
@@ -17,12 +17,6 @@
 
 package org.apache.nifi.controller.repository;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
@@ -34,6 +28,7 @@ import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
 import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate;
 import org.apache.nifi.repository.schema.FieldType;
 import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordIterator;
 import org.apache.nifi.repository.schema.RecordSchema;
 import org.apache.nifi.repository.schema.Repetition;
 import org.apache.nifi.repository.schema.SchemaRecordReader;
@@ -43,6 +38,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.wali.SerDe;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
 public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
     private static final Logger logger = LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class);
     private static final int MAX_ENCODING_VERSION = 2;
@@ -51,7 +53,8 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
     private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1;
 
     private final ResourceClaimManager resourceClaimManager;
-    private volatile RecordSchema recoverySchema;
+    private volatile SchemaRecordReader reader;
+    private RecordIterator recordIterator = null;
 
     public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) {
         this.resourceClaimManager = resourceClaimManager;
@@ -101,7 +104,8 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
 
     @Override
     public void readHeader(final DataInputStream in) throws IOException {
-        recoverySchema = RecordSchema.readFrom(in);
+        final RecordSchema recoverySchema = RecordSchema.readFrom(in);
+        reader = SchemaRecordReader.fromSchema(recoverySchema);
     }
 
     @Override
@@ -120,8 +124,41 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
 
     @Override
     public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
-        final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema);
-        final Record updateRecord = reader.readRecord(in);
+        if (recordIterator != null) {
+            final RepositoryRecord record = nextRecord();
+            if (record != null) {
+                return record;
+            }
+
+            recordIterator.close();
+        }
+
+        recordIterator = reader.readRecords(in);
+        if (recordIterator == null) {
+            return null;
+        }
+
+        return nextRecord();
+    }
+
+    private RepositoryRecord nextRecord() throws IOException {
+        final Record record;
+        try {
+            record = recordIterator.next();
+        } catch (final Exception e) {
+            recordIterator.close();
+            recordIterator = null;
+            throw e;
+        }
+
+        if (record == null) {
+            return null;
+        }
+
+        return createRepositoryRecord(record);
+    }
+
+    private RepositoryRecord createRepositoryRecord(final Record updateRecord) throws IOException {
         if (updateRecord == null) {
             // null may be returned by reader.readRecord() if it encounters end-of-stream
             return null;
@@ -246,4 +283,18 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
         return MAX_ENCODING_VERSION;
     }
 
+    @Override
+    public boolean isWriteExternalFileReferenceSupported() {
+        return true;
+    }
+
+    @Override
+    public void writeExternalFileReference(final File externalFile, final DataOutputStream out) throws IOException {
+        new SchemaRecordWriter().writeExternalFileReference(out, externalFile);
+    }
+
+    @Override
+    public boolean isMoreInExternalFile() throws IOException {
+        return recordIterator != null && recordIterator.isNext();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c425bd28/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 256dba9..c041f5c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -578,7 +578,7 @@ public class FileSystemRepository implements ContentRepository {
             }
 
             final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER;
-            final String section = String.valueOf(modulatedSectionIndex);
+            final String section = String.valueOf(modulatedSectionIndex).intern();
             final String claimId = System.currentTimeMillis() + "-" + currentIndex;
 
             resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant, true);