You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/02/25 05:57:25 UTC

[nifi] 16/24: NIFI-7646, NIFI-8222: Instead of having StandardProcessSession call ContentRepository.read(ContentClaim), introduced a new ContentRepository.read(ResourceClaim) and hold open the InputStream to the ResourceClaim. This can't be supported by EncryptedContentRepository, so introduced a method to allow using this or not. The benefit here is that when we have many FlowFiles read within a session, such as when using MergeContent/MergeRecord or a processor configured with a Run Duration, we can h [...]

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.13
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit ae4bc015c8defe39d0aa64f0e62c6f7f37881b69
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jul 17 09:12:47 2020 -0400

    NIFI-7646, NIFI-8222: Instead of having StandardProcessSession call ContentRepository.read(ContentClaim), introduced a new ContentRepository.read(ResourceClaim) and hold open the InputStream to the ResourceClaim. This can't be supported by EncryptedContentRepository, so introduced a method to allow using this or not. The benefit here is that when we have many FlowFiles read within a session, such as when using MergeContent/MergeRecord or a processor configured with a Run Duration, we  [...]
    - Instead of entering a 'synchronized' block for every provenance event, serialize up to 1 MB worth of data, and then enter synchronized block to write that data out. This avoids large amounts of lock contention and context switches
    
    NIFI-7646: Removed TODO and unused Jackson dependency
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #4818
---
 .../apache/nifi/stream/io/LimitingInputStream.java |   7 +
 .../controller/repository/ContentRepository.java   |  18 ++
 .../repository/StandardProcessSession.java         |  47 +++--
 .../repository/io/ContentClaimInputStream.java     |  10 +-
 .../repository/FileSystemRepository.java           |  16 ++
 .../repository/VolatileContentRepository.java      |   5 +
 .../crypto/EncryptedFileSystemRepository.java      |  26 ++-
 .../EncryptedFileSystemRepositoryTest.groovy       |   2 +-
 .../repository/StandardProcessSessionIT.java       |  21 ++
 .../repository/ByteArrayContentRepository.java     |  14 ++
 .../provenance/EncryptedSchemaRecordWriter.java    | 133 ++----------
 .../EncryptedWriteAheadProvenanceRepository.java   |  12 +-
 .../provenance/EventIdFirstSchemaRecordWriter.java | 232 ++++++++-------------
 .../provenance/WriteAheadProvenanceRepository.java |   2 +
 .../serialization/CompressableRecordWriter.java    |  19 +-
 .../provenance/serialization/RecordReaders.java    |  23 +-
 .../provenance/serialization/RecordWriter.java     |  19 +-
 .../provenance/store/WriteAheadStorePartition.java |  10 +-
 .../provenance/util/ByteArrayDataOutputStream.java |  39 ----
 .../util/ByteArrayDataOutputStreamCache.java       |  52 -----
 .../EncryptedSchemaRecordReaderWriterTest.groovy   |   6 +-
 .../provenance/AbstractTestRecordReaderWriter.java |  44 ++--
 .../TestEventIdFirstSchemaRecordReaderWriter.java  |  48 ++---
 23 files changed, 344 insertions(+), 461 deletions(-)

diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
index 70c6a32..df8391a 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
@@ -25,6 +25,7 @@ public class LimitingInputStream extends InputStream {
     private final long limit;
     private long bytesRead = 0;
     private volatile boolean limitReached = false;
+    private long markOffset = -1L;
 
     /**
      * Constructs a limited input stream whereby if the limit is reached all
@@ -112,6 +113,7 @@ public class LimitingInputStream extends InputStream {
     @Override
     public void mark(int readlimit) {
         in.mark(readlimit);
+        markOffset = bytesRead;
     }
 
     @Override
@@ -122,6 +124,11 @@ public class LimitingInputStream extends InputStream {
     @Override
     public void reset() throws IOException {
         in.reset();
+
+        if (markOffset >= 0) {
+            bytesRead = markOffset;
+        }
+        markOffset = -1;
     }
 
     public long getLimit() {
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index 7636966..97dd100 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -253,6 +253,24 @@ public interface ContentRepository {
     InputStream read(ContentClaim claim) throws IOException;
 
     /**
+     * Provides access ot the input stream for the entire Resource Claim
+     * @param claim the resource claim to read from
+     * @return InputStream over the content of the entire Resource Claim
+     * @throws IOException if unable to read
+     */
+    InputStream read(ResourceClaim claim) throws IOException;
+
+    /**
+     * Indicates whether or not this Content Repository supports obtaining an InputStream for
+     * an entire Resource Claim. If this method returns <code>false</code>, the {@link #read(ResourceClaim)} should not
+     * be called and instead {@link #read(ContentClaim)} should always be used
+     * @return <code>true</code> if reading an entire Resource Claim is allowed, <code>false</code> otherwise
+     */
+    default boolean isResourceClaimStreamSupported() {
+        return true;
+    }
+
+    /**
      * Obtains an OutputStream to the content for the given claim.
      *
      * @param claim to write to
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 0cdfd05..1d24f33 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -59,10 +59,12 @@ import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
@@ -145,8 +147,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
     private int flowFilesIn = 0, flowFilesOut = 0;
     private long contentSizeIn = 0L, contentSizeOut = 0L;
 
-    private ContentClaim currentReadClaim = null;
-    private ContentClaimInputStream currentReadClaimStream = null;
+    private ResourceClaim currentReadClaim = null;
+    private ByteCountingInputStream currentReadClaimStream = null;
     private long processingStartTime;
 
     // List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed
@@ -2259,12 +2261,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
             context.getProvenanceRepository().registerEvents(iterable);
             context.getFlowFileRepository().updateRepository(expiredRecords);
         } catch (final IOException e) {
-            LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e);
+            LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e.toString(), e);
         }
 
     }
 
-    private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long offset, final boolean allowCachingOfStream) throws ContentNotFoundException {
+    private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long contentClaimOffset, final boolean allowCachingOfStream) throws ContentNotFoundException {
         // If there's no content, don't bother going to the Content Repository because it is generally expensive and we know
         // that there is no actual content.
         if (flowFile.getSize() == 0L) {
@@ -2275,15 +2277,18 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
             // If the recursion set is empty, we can use the same input stream that we already have open. However, if
             // the recursion set is NOT empty, we can't do this because we may be reading the input of FlowFile 1 while in the
             // callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1.
-            if (allowCachingOfStream && readRecursionSet.isEmpty() && writeRecursionSet.isEmpty()) {
-                if (currentReadClaim == claim) {
-                    if (currentReadClaimStream != null && currentReadClaimStream.getCurrentOffset() <= offset) {
-                        final long bytesToSkip = offset - currentReadClaimStream.getCurrentOffset();
+            if (allowCachingOfStream && readRecursionSet.isEmpty() && !writeRecursionSet.contains(flowFile) && context.getContentRepository().isResourceClaimStreamSupported()) {
+                if (currentReadClaim == claim.getResourceClaim()) {
+                    final long resourceClaimOffset = claim.getOffset() + contentClaimOffset;
+                    if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= resourceClaimOffset) {
+                        final long bytesToSkip = resourceClaimOffset - currentReadClaimStream.getBytesConsumed();
                         if (bytesToSkip > 0) {
                             StreamUtils.skip(currentReadClaimStream, bytesToSkip);
                         }
 
-                        return new DisableOnCloseInputStream(currentReadClaimStream);
+                        final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize());
+                        final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream);
+                        return contentClaimInputStream;
                     }
                 }
 
@@ -2293,17 +2298,25 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
                     currentReadClaimStream.close();
                 }
 
-                currentReadClaim = claim;
-                currentReadClaimStream = new ContentClaimInputStream(context.getContentRepository(), claim, offset);
-
-                // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
-                // reuse the same InputStream for the next FlowFile
-                final InputStream disableOnClose = new DisableOnCloseInputStream(currentReadClaimStream);
-                return disableOnClose;
+                currentReadClaim = claim.getResourceClaim();
+                final InputStream contentRepoStream = context.getContentRepository().read(claim.getResourceClaim());
+                StreamUtils.skip(contentRepoStream, claim.getOffset());
+                final InputStream bufferedContentStream = new BufferedInputStream(contentRepoStream);
+                final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset());
+                currentReadClaimStream = byteCountingInputStream;
+
+                // Use a non-closeable stream (DisableOnCloseInputStream) because we want to keep it open after the callback has finished so that we can
+                // reuse the same InputStream for the next FlowFile. We then need to use a LimitingInputStream to ensure that we don't allow the InputStream
+                // to be read past the end of the FlowFile (since multiple FlowFiles' contents may be in the given Resource Claim Input Stream).
+                // Finally, we need to wrap the InputStream in a ContentClaimInputStream so that if mark/reset is used, we can provide that capability
+                // without buffering data in memory.
+                final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize());
+                final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream);
+                return contentClaimInputStream;
             } else {
                 claimCache.flush(claim);
 
-                final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, offset);
+                final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset);
                 return rawInStream;
             }
         } catch (final ContentNotFoundException cnfe) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
index 94b9d2e..169f0e2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
@@ -39,11 +39,16 @@ public class ContentClaimInputStream extends InputStream {
     private long markOffset;
 
     public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset) {
+        this(contentRepository, contentClaim, claimOffset, null);
+    }
+
+    public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset, final InputStream initialDelegate) {
         this.contentRepository = contentRepository;
         this.contentClaim = contentClaim;
         this.claimOffset = claimOffset;
 
         this.currentOffset = claimOffset;
+        this.delegate = initialDelegate;
     }
 
     private InputStream getDelegate() throws IOException {
@@ -132,7 +137,10 @@ public class ContentClaimInputStream extends InputStream {
         }
 
         if (currentOffset != markOffset) {
-            delegate.close();
+            if (delegate != null) {
+                delegate.close();
+            }
+
             formDelegate();
             StreamUtils.skip(delegate, markOffset - claimOffset);
             currentOffset = markOffset;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 498852a..71e7d6c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -552,6 +552,11 @@ public class FileSystemRepository implements ContentRepository {
         return containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
     }
 
+    public Path getPath(final ResourceClaim resourceClaim, final boolean verifyExists) throws ContentNotFoundException {
+        final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
+        return getPath(contentClaim, verifyExists);
+    }
+
     public Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException {
         final ResourceClaim resourceClaim = claim.getResourceClaim();
         final Path containerPath = containers.get(resourceClaim.getContainer());
@@ -875,6 +880,17 @@ public class FileSystemRepository implements ContentRepository {
     }
 
     @Override
+    public InputStream read(final ResourceClaim claim) throws IOException {
+        if (claim == null) {
+            return new ByteArrayInputStream(new byte[0]);
+        }
+
+        final Path path = getPath(claim, true);
+        final FileInputStream fis = new FileInputStream(path.toFile());
+        return fis;
+    }
+
+    @Override
     public InputStream read(final ContentClaim claim) throws IOException {
         if (claim == null) {
             return new ByteArrayInputStream(new byte[0]);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index ba5a3f9..a8c867f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -469,6 +469,11 @@ public class VolatileContentRepository implements ContentRepository {
     }
 
     @Override
+    public InputStream read(final ResourceClaim claim) throws IOException {
+        return read(new StandardContentClaim(claim, 0L));
+    }
+
+    @Override
     public OutputStream write(final ContentClaim claim) throws IOException {
         final ContentClaim backupClaim = getBackupClaim(claim);
         return backupClaim == null ? getContent(claim).write() : getBackupRepository().write(backupClaim);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
index 676639c..f9041d9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
@@ -16,16 +16,10 @@
  */
 package org.apache.nifi.controller.repository.crypto;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-import java.security.KeyManagementException;
-import javax.crypto.CipherOutputStream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.controller.repository.FileSystemRepository;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.security.kms.EncryptionException;
 import org.apache.nifi.security.kms.KeyProvider;
@@ -40,6 +34,14 @@ import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.crypto.CipherOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.security.KeyManagementException;
+
 /**
  * This class is an implementation of the {@link FileSystemRepository} content repository which provides transparent
  * streaming encryption/decryption of content claim data during file system interaction. As of Apache NiFi 1.10.0
@@ -155,6 +157,16 @@ public class EncryptedFileSystemRepository extends FileSystemRepository {
         return super.exportTo(claim, destination, append, offset, length);
     }
 
+    @Override
+    public InputStream read(final ResourceClaim claim) {
+        throw new UnsupportedOperationException("Cannot read full ResourceClaim as a Stream when using EncryptedFileSystemRepository");
+    }
+
+    @Override
+    public boolean isResourceClaimStreamSupported() {
+        return false;
+    }
+
     /**
      * Returns an InputStream (actually a {@link javax.crypto.CipherInputStream}) which wraps
      * the {@link java.io.FileInputStream} from the content repository claim on disk. This
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy
index 5c03690..7425f33 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepositoryTest.groovy
@@ -161,7 +161,7 @@ class EncryptedFileSystemRepositoryTest {
 
     @Test
     void testReadNullContentClaimShouldReturnEmptyInputStream() {
-        final InputStream inputStream = repository.read(null)
+        final InputStream inputStream = repository.read((ContentClaim) null)
         final int read = inputStream.read()
         assert read == -1
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index 505bc3f..9c65dce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -2745,6 +2745,10 @@ public class StandardProcessSessionIT {
 
         private Path getPath(final ContentClaim contentClaim) {
             final ResourceClaim claim = contentClaim.getResourceClaim();
+            return getPath(claim);
+        }
+
+        private Path getPath(final ResourceClaim claim) {
             return Paths.get("target").resolve("contentRepo").resolve(claim.getContainer()).resolve(claim.getSection()).resolve(claim.getId());
         }
 
@@ -2807,6 +2811,23 @@ public class StandardProcessSessionIT {
         }
 
         @Override
+        public InputStream read(final ResourceClaim claim) throws IOException {
+            if (disableRead) {
+                throw new IOException("Reading from repo is disabled by unit test");
+            }
+
+            if (claim == null) {
+                return new ByteArrayInputStream(new byte[0]);
+            }
+
+            try {
+                return new FileInputStream(getPath(claim).toFile());
+            } catch (final FileNotFoundException fnfe) {
+                throw new ContentNotFoundException(null, fnfe);
+            }
+        }
+
+        @Override
         public OutputStream write(final ContentClaim claim) throws IOException {
             final Path path = getPath(claim);
             final File file = path.toFile();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
index 9c42d7e..f20acc5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
@@ -214,6 +214,20 @@ public class ByteArrayContentRepository implements ContentRepository {
     }
 
     @Override
+    public InputStream read(final ResourceClaim claim) throws IOException {
+        if (claim == null) {
+            return new ByteArrayInputStream(new byte[0]);
+        }
+
+        if (!(claim instanceof ByteArrayResourceClaim)) {
+            throw new IllegalArgumentException("Cannot access Resource Claim " + claim + " because the Resource Claim does not belong to this Content Repository");
+        }
+
+        final ByteArrayResourceClaim byteArrayResourceClaim = (ByteArrayResourceClaim) claim;
+        return byteArrayResourceClaim.read();
+    }
+
+    @Override
     public OutputStream write(final ContentClaim claim) {
         final ByteArrayContentClaim byteArrayContentClaim = verifyClaim(claim);
         return byteArrayContentClaim.writeTo();
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java
index f84ca48..bc88efe 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedSchemaRecordWriter.java
@@ -16,49 +16,28 @@
  */
 package org.apache.nifi.provenance;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.IOException;
 import java.security.KeyManagementException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.nifi.provenance.serialization.StorageSummary;
-import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.util.timebuffer.LongEntityAccess;
-import org.apache.nifi.util.timebuffer.TimedBuffer;
-import org.apache.nifi.util.timebuffer.TimestampedLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter {
     private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordWriter.class);
-
-    private static final int DEFAULT_DEBUG_FREQUENCY = 1_000_000;
+    public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter";
+    public static final int SERIALIZATION_VERSION = 1;
 
     private ProvenanceEventEncryptor provenanceEventEncryptor;
-
-    private static final TimedBuffer<TimestampedLong> encryptTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
-
     private String keyId;
 
-    private int debugFrequency;
-    public static final int SERIALIZATION_VERSION = 1;
-
-    public static final String SERIALIZATION_NAME = "EncryptedSchemaRecordWriter";
-
-    public EncryptedSchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter writer, final boolean compressed,
-                                       final int uncompressedBlockSize, final IdentifierLookup idLookup,
-                                       ProvenanceEventEncryptor provenanceEventEncryptor) throws IOException, EncryptionException {
-        this(file, idGenerator, writer, compressed, uncompressedBlockSize, idLookup, provenanceEventEncryptor, DEFAULT_DEBUG_FREQUENCY);
-    }
-
     public EncryptedSchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter writer, final boolean compressed,
                                        final int uncompressedBlockSize, final IdentifierLookup idLookup,
                                        ProvenanceEventEncryptor provenanceEventEncryptor, int debugFrequency) throws IOException, EncryptionException {
         super(file, idGenerator, writer, compressed, uncompressedBlockSize, idLookup);
         this.provenanceEventEncryptor = provenanceEventEncryptor;
-        this.debugFrequency = debugFrequency;
 
         try {
             this.keyId = getNextAvailableKeyId();
@@ -69,101 +48,21 @@ public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter
     }
 
     @Override
-    public StorageSummary writeRecord(final ProvenanceEventRecord record) throws IOException {
-        final long encryptStart = System.nanoTime();
-        byte[] cipherBytes;
+    protected byte[] serializeEvent(final ProvenanceEventRecord event) throws IOException {
+        final byte[] serialized = super.serializeEvent(event);
+        final String eventId = event.getBestEventIdentifier();
+
         try {
-            byte[] serialized;
-            try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
-                 final DataOutputStream dos = new DataOutputStream(baos)) {
-                writeRecord(record, 0L, dos);
-                serialized = baos.toByteArray();
-            }
-            String eventId = record.getBestEventIdentifier();
-            cipherBytes = encrypt(serialized, eventId);
+            final byte[] cipherBytes = encrypt(serialized, eventId);
+            return cipherBytes;
         } catch (EncryptionException e) {
             logger.error("Encountered an error: ", e);
             throw new IOException("Error encrypting the provenance record", e);
         }
-        final long encryptStop = System.nanoTime();
-
-        final long lockStart = System.nanoTime();
-        final long writeStart;
-        final long startBytes;
-        final long endBytes;
-        final long recordIdentifier;
-        synchronized (this) {
-            writeStart = System.nanoTime();
-            try {
-                recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId();
-                startBytes = getBytesWritten();
-
-                ensureStreamState(recordIdentifier, startBytes);
-
-                final DataOutputStream out = getBufferedOutputStream();
-                final int recordIdOffset = (int) (recordIdentifier - getFirstEventId());
-                out.writeInt(recordIdOffset);
-                out.writeInt(cipherBytes.length);
-                out.write(cipherBytes);
-
-                getRecordCount().incrementAndGet();
-                endBytes = getBytesWritten();
-            } catch (final IOException ioe) {
-                markDirty();
-                throw ioe;
-            }
-        }
-
-        if (logger.isDebugEnabled()) {
-            // Collect stats and periodically dump them if log level is set to at least info.
-            final long writeNanos = System.nanoTime() - writeStart;
-            getWriteTimes().add(new TimestampedLong(writeNanos));
-
-            final long serializeNanos = lockStart - encryptStart;
-            getSerializeTimes().add(new TimestampedLong(serializeNanos));
-
-            final long encryptNanos = encryptStop - encryptStart;
-            getEncryptTimes().add(new TimestampedLong(encryptNanos));
-
-            final long lockNanos = writeStart - lockStart;
-            getLockTimes().add(new TimestampedLong(lockNanos));
-            getBytesWrittenBuffer().add(new TimestampedLong(endBytes - startBytes));
-
-            final long recordCount = getTotalRecordCount().incrementAndGet();
-            if (recordCount % debugFrequency == 0) {
-                printStats();
-            }
-        }
-
-        final long serializedLength = endBytes - startBytes;
-        final TocWriter tocWriter = getTocWriter();
-        final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex();
-        final File file = getFile();
-        final String storageLocation = file.getParentFile().getName() + "/" + file.getName();
-        return new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes);
-    }
-
-    private void printStats() {
-        final long sixtySecondsAgo = System.currentTimeMillis() - 60000L;
-        final Long writeNanosLast60 = getWriteTimes().getAggregateValue(sixtySecondsAgo).getValue();
-        final Long lockNanosLast60 = getLockTimes().getAggregateValue(sixtySecondsAgo).getValue();
-        final Long serializeNanosLast60 = getSerializeTimes().getAggregateValue(sixtySecondsAgo).getValue();
-        final Long encryptNanosLast60 = getEncryptTimes().getAggregateValue(sixtySecondsAgo).getValue();
-        final Long bytesWrittenLast60 = getBytesWrittenBuffer().getAggregateValue(sixtySecondsAgo).getValue();
-        logger.debug("In the last 60 seconds, have spent {} millis writing to file ({} MB), {} millis waiting on synchronize block, {} millis serializing events, {} millis encrypting events",
-                TimeUnit.NANOSECONDS.toMillis(writeNanosLast60),
-                bytesWrittenLast60 / 1024 / 1024,
-                TimeUnit.NANOSECONDS.toMillis(lockNanosLast60),
-                TimeUnit.NANOSECONDS.toMillis(serializeNanosLast60),
-                TimeUnit.NANOSECONDS.toMillis(encryptNanosLast60));
-    }
-
-    static TimedBuffer<TimestampedLong> getEncryptTimes() {
-        return encryptTimes;
     }
 
-    private byte[] encrypt(byte[] serialized, String eventId) throws IOException, EncryptionException {
-        String keyId = getKeyId();
+    private byte[] encrypt(byte[] serialized, String eventId) throws EncryptionException {
+        final String keyId = getKeyId();
         try {
             return provenanceEventEncryptor.encrypt(serialized, eventId, keyId);
         } catch (Exception e) {
@@ -192,8 +91,6 @@ public class EncryptedSchemaRecordWriter extends EventIdFirstSchemaRecordWriter
 
     @Override
     public String toString() {
-        return "EncryptedSchemaRecordWriter" +
-                " using " + provenanceEventEncryptor +
-                " and current keyId " + keyId;
+        return "EncryptedSchemaRecordWriter[keyId=" + keyId + ", encryptor=" + provenanceEventEncryptor + "]";
     }
 }
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java
index f49e8a6..24ee0b5 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepository.java
@@ -16,10 +16,6 @@
  */
 package org.apache.nifi.provenance;
 
-import java.io.IOException;
-import java.security.KeyManagementException;
-import javax.crypto.SecretKey;
-import javax.crypto.spec.SecretKeySpec;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.nifi.authorization.Authorizer;
@@ -38,6 +34,11 @@ import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.IOException;
+import java.security.KeyManagementException;
+
 /**
  * This class is an implementation of the {@link WriteAheadProvenanceRepository} provenance repository which provides transparent
  * block encryption/decryption of provenance event data during file system interaction. As of Apache NiFi 1.10.0
@@ -53,10 +54,13 @@ public class EncryptedWriteAheadProvenanceRepository extends WriteAheadProvenanc
     /**
      * This constructor exists solely for the use of the Java Service Loader mechanism and should not be used.
      */
+    @SuppressWarnings("unused")
     public EncryptedWriteAheadProvenanceRepository() {
         super();
     }
 
+    // Created via reflection from FlowController
+    @SuppressWarnings("unused")
     public EncryptedWriteAheadProvenanceRepository(final NiFiProperties nifiProperties) {
         super(RepositoryConfiguration.create(nifiProperties));
     }
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
index 05e6736..a60dcf4 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java
@@ -17,19 +17,6 @@
 
 package org.apache.nifi.provenance;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.provenance.schema.EventFieldNames;
 import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema;
 import org.apache.nifi.provenance.schema.LookupTableEventRecord;
@@ -37,21 +24,24 @@ import org.apache.nifi.provenance.schema.LookupTableEventSchema;
 import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
 import org.apache.nifi.provenance.serialization.StorageSummary;
 import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.provenance.util.ByteArrayDataOutputStream;
-import org.apache.nifi.provenance.util.ByteArrayDataOutputStreamCache;
 import org.apache.nifi.repository.schema.FieldMapRecord;
 import org.apache.nifi.repository.schema.Record;
 import org.apache.nifi.repository.schema.RecordSchema;
 import org.apache.nifi.repository.schema.SchemaRecordWriter;
-import org.apache.nifi.util.timebuffer.LongEntityAccess;
-import org.apache.nifi.util.timebuffer.TimedBuffer;
-import org.apache.nifi.util.timebuffer.TimestampedLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
-    private static final Logger logger = LoggerFactory.getLogger(EventIdFirstSchemaRecordWriter.class);
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
+public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
     private static final RecordSchema eventSchema = LookupTableEventSchema.EVENT_SCHEMA;
     private static final RecordSchema contentClaimSchema = new RecordSchema(eventSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields());
     private static final RecordSchema previousContentClaimSchema = new RecordSchema(eventSchema.getField(EventFieldNames.PREVIOUS_CONTENT_CLAIM).getSubFields());
@@ -70,14 +60,6 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
     private static final Map<String, Integer> eventTypeMap;
     private static final List<String> eventTypeNames;
 
-    private static final TimedBuffer<TimestampedLong> serializeTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
-    private static final TimedBuffer<TimestampedLong> lockTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
-    private static final TimedBuffer<TimestampedLong> writeTimes = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
-    private static final TimedBuffer<TimestampedLong> bytesWritten = new TimedBuffer<>(TimeUnit.SECONDS, 60, new LongEntityAccess());
-    private static final AtomicLong totalRecordCount = new AtomicLong(0L);
-
-    private static final ByteArrayDataOutputStreamCache streamCache = new ByteArrayDataOutputStreamCache(32, 8 * 1024, 256 * 1024);
-
     private long firstEventId;
     private long systemTimeOffset;
 
@@ -102,94 +84,89 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
         queueIdMap = idLookup.invertQueueIdentifiers();
     }
 
-    public EventIdFirstSchemaRecordWriter(final OutputStream out, final String storageLocation, final AtomicLong idGenerator, final TocWriter tocWriter, final boolean compressed,
-        final int uncompressedBlockSize, final IdentifierLookup idLookup) throws IOException {
-        super(out, storageLocation, idGenerator, tocWriter, compressed, uncompressedBlockSize);
-
-        this.idLookup = idLookup;
-        componentIdMap = idLookup.invertComponentIdentifiers();
-        componentTypeMap = idLookup.invertComponentTypes();
-        queueIdMap = idLookup.invertQueueIdentifiers();
-    }
 
     @Override
-    public StorageSummary writeRecord(final ProvenanceEventRecord record) throws IOException {
+    public Map<ProvenanceEventRecord, StorageSummary> writeRecords(final Iterable<ProvenanceEventRecord> events) throws IOException {
         if (isDirty()) {
             throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
         }
 
-        final long lockStart;
-        final long writeStart;
-        final long startBytes;
-        final long endBytes;
-        final long recordIdentifier;
-
-        final long serializeStart = System.nanoTime();
-        final ByteArrayDataOutputStream bados = streamCache.checkOut();
-        try {
-            writeRecord(record, 0L, bados.getDataOutputStream());
-
-            lockStart = System.nanoTime();
-            synchronized (this) {
-                writeStart = System.nanoTime();
-                try {
-                    recordIdentifier = record.getEventId() == -1L ? getIdGenerator().getAndIncrement() : record.getEventId();
-                    startBytes = getBytesWritten();
-
-                    ensureStreamState(recordIdentifier, startBytes);
-
-                    final DataOutputStream out = getBufferedOutputStream();
-                    final int recordIdOffset = (int) (recordIdentifier - firstEventId);
-                    out.writeInt(recordIdOffset);
-
-                    final ByteArrayOutputStream baos = bados.getByteArrayOutputStream();
-                    out.writeInt(baos.size());
-                    baos.writeTo(out);
-
-                    recordCount.incrementAndGet();
-                    endBytes = getBytesWritten();
-                } catch (final IOException ioe) {
-                    markDirty();
-                    throw ioe;
-                }
+        final int heapThreshold = 1_000_000;
+        final Map<ProvenanceEventRecord, StorageSummary> storageSummaries = new HashMap<>();
+
+        final Map<ProvenanceEventRecord, byte[]> serializedEvents = new LinkedHashMap<>();
+        int totalBytes = 0;
+
+        for (final ProvenanceEventRecord event : events) {
+            final byte[] serialized = serializeEvent(event);
+            serializedEvents.put(event, serialized);
+            totalBytes += serialized.length;
+
+            if (totalBytes >= heapThreshold) {
+                storeEvents(serializedEvents, storageSummaries);
+                recordCount.addAndGet(serializedEvents.size());
+                serializedEvents.clear();
+                totalBytes = 0;
             }
-        } finally {
-            streamCache.checkIn(bados);
         }
 
-        if (logger.isDebugEnabled()) {
-            // Collect stats and periodically dump them if log level is set to at least info.
-            final long writeNanos = System.nanoTime() - writeStart;
-            writeTimes.add(new TimestampedLong(writeNanos));
-
-            final long serializeNanos = lockStart - serializeStart;
-            serializeTimes.add(new TimestampedLong(serializeNanos));
-
-            final long lockNanos = writeStart - lockStart;
-            lockTimes.add(new TimestampedLong(lockNanos));
-            bytesWritten.add(new TimestampedLong(endBytes - startBytes));
-
-            final long recordCount = totalRecordCount.incrementAndGet();
-            if (recordCount % 1_000_000 == 0) {
-                final long sixtySecondsAgo = System.currentTimeMillis() - 60000L;
-                final Long writeNanosLast60 = writeTimes.getAggregateValue(sixtySecondsAgo).getValue();
-                final Long lockNanosLast60 = lockTimes.getAggregateValue(sixtySecondsAgo).getValue();
-                final Long serializeNanosLast60 = serializeTimes.getAggregateValue(sixtySecondsAgo).getValue();
-                final Long bytesWrittenLast60 = bytesWritten.getAggregateValue(sixtySecondsAgo).getValue();
-                logger.debug("In the last 60 seconds, have spent {} millis writing to file ({} MB), {} millis waiting on synchronize block, {} millis serializing events",
-                    TimeUnit.NANOSECONDS.toMillis(writeNanosLast60),
-                    bytesWrittenLast60 / 1024 / 1024,
-                    TimeUnit.NANOSECONDS.toMillis(lockNanosLast60),
-                    TimeUnit.NANOSECONDS.toMillis(serializeNanosLast60));
+        storeEvents(serializedEvents, storageSummaries);
+        recordCount.addAndGet(serializedEvents.size());
+
+        return storageSummaries;
+    }
+
+    protected byte[] serializeEvent(final ProvenanceEventRecord event) throws IOException {
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             final DataOutputStream dataOutputStream = new DataOutputStream(baos)) {
+            writeRecord(event, 0L, dataOutputStream);
+            dataOutputStream.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    private synchronized void storeEvents(final Map<ProvenanceEventRecord, byte[]> serializedEvents, final Map<ProvenanceEventRecord, StorageSummary> summaryMap) throws IOException {
+        for (final Map.Entry<ProvenanceEventRecord, byte[]> entry : serializedEvents.entrySet()) {
+            final ProvenanceEventRecord event = entry.getKey();
+            final byte[] serialized = entry.getValue();
+
+            final long startBytes;
+            final long endBytes;
+            final long recordIdentifier;
+
+            try {
+                recordIdentifier = event.getEventId() == -1 ? getIdGenerator().getAndIncrement() : event.getEventId();
+                startBytes = getBytesWritten();
+
+                ensureStreamState(recordIdentifier, startBytes);
+
+                final DataOutputStream out = getBufferedOutputStream();
+                final int recordIdOffset = (int) (recordIdentifier - firstEventId);
+                out.writeInt(recordIdOffset);
+
+                out.writeInt(serialized.length);
+                out.write(serialized);
+
+                endBytes = getBytesWritten();
+            } catch (final IOException ioe) {
+                markDirty();
+                throw ioe;
             }
+
+            final long serializedLength = endBytes - startBytes;
+            final TocWriter tocWriter = getTocWriter();
+            final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex();
+            final File file = getFile();
+            final String storageLocation = file.getParentFile().getName() + "/" + file.getName();
+            final StorageSummary storageSummary = new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes);
+            summaryMap.put(event, storageSummary);
         }
+    }
 
-        final long serializedLength = endBytes - startBytes;
-        final TocWriter tocWriter = getTocWriter();
-        final Integer blockIndex = tocWriter == null ? null : tocWriter.getCurrentBlockIndex();
-        final File file = getFile();
-        final String storageLocation = file.getParentFile().getName() + "/" + file.getName();
-        return new StorageSummary(recordIdentifier, storageLocation, blockIndex, serializedLength, endBytes);
+    @Override
+    public StorageSummary writeRecord(final ProvenanceEventRecord record) {
+        // This method should never be called because it's only called by super.writeRecords. That method is overridden in this class and never delegates to this method.
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -245,47 +222,4 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter {
     protected String getSerializationName() {
         return SERIALIZATION_NAME;
     }
-
-    /* Getters for internal state written to by subclass EncryptedSchemaRecordWriter */
-
-    IdentifierLookup getIdLookup() {
-        return idLookup;
-    }
-
-    SchemaRecordWriter getSchemaRecordWriter() {
-        return schemaRecordWriter;
-    }
-
-    AtomicInteger getRecordCount() {
-        return recordCount;
-    }
-
-    static TimedBuffer<TimestampedLong> getSerializeTimes() {
-        return serializeTimes;
-    }
-
-    static TimedBuffer<TimestampedLong> getLockTimes() {
-        return lockTimes;
-    }
-
-    static TimedBuffer<TimestampedLong> getWriteTimes() {
-        return writeTimes;
-    }
-
-    static TimedBuffer<TimestampedLong> getBytesWrittenBuffer() {
-        return bytesWritten;
-    }
-
-    static AtomicLong getTotalRecordCount() {
-        return totalRecordCount;
-    }
-
-    long getFirstEventId() {
-        return firstEventId;
-    }
-
-    long getSystemTimeOffset() {
-        return systemTimeOffset;
-    }
-
 }
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
index 2dc249d..07eab61 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
@@ -108,6 +108,8 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
         config = null;
     }
 
+    // Created via reflection from FlowController
+    @SuppressWarnings("unused")
     public WriteAheadProvenanceRepository(final NiFiProperties nifiProperties) {
         this(RepositoryConfiguration.create(nifiProperties));
     }
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
index 21e2c07..bb679e7 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
@@ -17,14 +17,6 @@
 
 package org.apache.nifi.provenance.serialization;
 
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.nifi.provenance.AbstractRecordWriter;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.toc.TocWriter;
@@ -34,6 +26,14 @@ import org.apache.nifi.stream.io.NonCloseableOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicLong;
+
 public abstract class CompressableRecordWriter extends AbstractRecordWriter {
     private static final Logger logger = LoggerFactory.getLogger(CompressableRecordWriter.class);
 
@@ -145,7 +145,7 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter {
         }
     }
 
-    protected synchronized void ensureStreamState(final long recordIdentifier, final long startBytes) throws IOException {
+    protected void ensureStreamState(final long recordIdentifier, final long startBytes) throws IOException {
         // add a new block to the TOC if needed.
         if (getTocWriter() != null && (startBytes - blockStartOffset >= uncompressedBlockSize)) {
             blockStartOffset = startBytes;
@@ -222,4 +222,5 @@ public abstract class CompressableRecordWriter extends AbstractRecordWriter {
     protected abstract int getSerializationVersion();
 
     protected abstract String getSerializationName();
+
 }
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index 475c750..ad0c5b5 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -16,17 +16,6 @@
  */
 package org.apache.nifi.provenance.serialization;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.zip.GZIPInputStream;
 import org.apache.nifi.properties.NiFiPropertiesLoader;
 import org.apache.nifi.provenance.ByteArraySchemaRecordReader;
 import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
@@ -44,6 +33,18 @@ import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.zip.GZIPInputStream;
+
 public class RecordReaders {
 
     private static Logger logger = LoggerFactory.getLogger(RecordReaders.class);
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index c9d2a22..5b5a9d6 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -16,12 +16,14 @@
  */
 package org.apache.nifi.provenance.serialization;
 
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocWriter;
+
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.toc.TocWriter;
+import java.util.HashMap;
+import java.util.Map;
 
 public interface RecordWriter extends Closeable {
 
@@ -42,6 +44,17 @@ public interface RecordWriter extends Closeable {
      */
     StorageSummary writeRecord(ProvenanceEventRecord record) throws IOException;
 
+    default Map<ProvenanceEventRecord, StorageSummary> writeRecords(Iterable<ProvenanceEventRecord> events) throws IOException {
+        final Map<ProvenanceEventRecord, StorageSummary> locationMap = new HashMap<>();
+
+        for (final ProvenanceEventRecord nextEvent : events) {
+            final StorageSummary writerSummary = writeRecord(nextEvent);
+            locationMap.put(nextEvent, writerSummary);
+        }
+
+        return locationMap;
+    }
+
     /**
      * Flushes any data that is held in a buffer to the underlying storage mechanism
      *
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
index 1df84f6..c32de77 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java
@@ -320,11 +320,15 @@ public class WriteAheadStorePartition implements EventStorePartition {
         try {
             long maxId = -1L;
             int numEvents = 0;
-            for (final ProvenanceEventRecord nextEvent : events) {
-                final StorageSummary writerSummary = writer.writeRecord(nextEvent);
+
+            final Map<ProvenanceEventRecord, StorageSummary> writerSummaries = writer.writeRecords(events);
+            for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : writerSummaries.entrySet()) {
+                final ProvenanceEventRecord eventRecord = entry.getKey();
+                final StorageSummary writerSummary = entry.getValue();
+
                 final StorageSummary summaryWithIndex = new StorageSummary(writerSummary.getEventId(), writerSummary.getStorageLocation(), this.partitionName,
                     writerSummary.getBlockIndex(), writerSummary.getSerializedLength(), writerSummary.getBytesWritten());
-                locationMap.put(nextEvent, summaryWithIndex);
+                locationMap.put(eventRecord, summaryWithIndex);
                 maxId = summaryWithIndex.getEventId();
                 numEvents++;
             }
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java
deleted file mode 100644
index 23aefb3..0000000
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStream.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.provenance.util;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-
-public class ByteArrayDataOutputStream {
-    private final ByteArrayOutputStream baos;
-    private final DataOutputStream dos;
-
-    public ByteArrayDataOutputStream(final int initialCapacity) {
-        baos = new ByteArrayOutputStream(initialCapacity);
-        dos = new DataOutputStream(baos);
-    }
-
-    public ByteArrayOutputStream getByteArrayOutputStream() {
-        return baos;
-    }
-
-    public DataOutputStream getDataOutputStream() {
-        return dos;
-    }
-}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java
deleted file mode 100644
index 9535590..0000000
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/ByteArrayDataOutputStreamCache.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.provenance.util;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class ByteArrayDataOutputStreamCache {
-    private final BlockingQueue<ByteArrayDataOutputStream> queue;
-    private final int initialBufferSize;
-    private final int maxBufferSize;
-
-    public ByteArrayDataOutputStreamCache(final int maxCapacity, final int initialBufferSize, final int maxBufferSize) {
-        this.queue = new LinkedBlockingQueue<>(maxCapacity);
-        this.initialBufferSize = initialBufferSize;
-        this.maxBufferSize = maxBufferSize;
-    }
-
-    public ByteArrayDataOutputStream checkOut() {
-        final ByteArrayDataOutputStream stream = queue.poll();
-        if (stream != null) {
-            return stream;
-        }
-
-        return new ByteArrayDataOutputStream(initialBufferSize);
-    }
-
-    public void checkIn(final ByteArrayDataOutputStream bados) {
-        final int size = bados.getByteArrayOutputStream().size();
-        if (size > maxBufferSize) {
-            return;
-        }
-
-        bados.getByteArrayOutputStream().reset();
-        queue.offer(bados);
-    }
-}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
index c146982..784eb9a 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
@@ -199,7 +199,7 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit
         // Act
         int encryptedRecordId = idGenerator.get()
         encryptedWriter.writeHeader(encryptedRecordId)
-        encryptedWriter.writeRecord(record)
+        encryptedWriter.writeRecords(Collections.singletonList(record))
         encryptedWriter.close()
         logger.info("Wrote encrypted record ${encryptedRecordId} to journal")
 
@@ -242,13 +242,13 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit
         // Act
         int standardRecordId = idGenerator.get()
         standardWriter.writeHeader(standardRecordId)
-        standardWriter.writeRecord(record)
+        standardWriter.writeRecords(Collections.singletonList(record))
         standardWriter.close()
         logger.info("Wrote standard record ${standardRecordId} to journal")
 
         int encryptedRecordId = idGenerator.get()
         encryptedWriter.writeHeader(encryptedRecordId)
-        encryptedWriter.writeRecord(record)
+        encryptedWriter.writeRecords(Collections.singletonList(record))
         encryptedWriter.close()
         logger.info("Wrote encrypted record ${encryptedRecordId} to journal")
 
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
index 4b2ca50..ddb6d02 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
@@ -17,21 +17,6 @@
 
 package org.apache.nifi.provenance;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.toc.StandardTocReader;
@@ -43,6 +28,23 @@ import org.apache.nifi.util.file.FileUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 
 public abstract class AbstractTestRecordReaderWriter {
     @BeforeClass
@@ -62,7 +64,7 @@ public abstract class AbstractTestRecordReaderWriter {
         final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024);
 
         writer.writeHeader(1L);
-        writer.writeRecord(createEvent());
+        writer.writeRecords(Collections.singletonList(createEvent()));
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
@@ -96,7 +98,7 @@ public abstract class AbstractTestRecordReaderWriter {
         final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
 
         writer.writeHeader(1L);
-        writer.writeRecord(createEvent());
+        writer.writeRecords(Collections.singletonList(createEvent()));
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
@@ -117,7 +119,7 @@ public abstract class AbstractTestRecordReaderWriter {
 
         writer.writeHeader(1L);
         for (int i = 0; i < 10; i++) {
-            writer.writeRecord(createEvent());
+            writer.writeRecords(Collections.singletonList(createEvent()));
         }
         writer.close();
 
@@ -156,7 +158,7 @@ public abstract class AbstractTestRecordReaderWriter {
 
         writer.writeHeader(1L);
         for (int i = 0; i < 10; i++) {
-            writer.writeRecord(createEvent());
+            writer.writeRecords(Collections.singletonList(createEvent()));
         }
         writer.close();
 
@@ -198,7 +200,7 @@ public abstract class AbstractTestRecordReaderWriter {
         for (int i = 0; i < numEvents; i++) {
             final ProvenanceEventRecord event = createEvent();
             events.add(event);
-            writer.writeRecord(event);
+            writer.writeRecords(Collections.singletonList(event));
         }
         writer.close();
 
@@ -208,6 +210,8 @@ public abstract class AbstractTestRecordReaderWriter {
             final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
 
             for (int i = 0; i < numEvents; i++) {
+                System.out.println(i);
+
                 final Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(i);
                 assertTrue(eventOption.isPresent());
                 assertEquals(i, eventOption.get().getEventId());
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java
index a6833b4..b6b86a2 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestEventIdFirstSchemaRecordReaderWriter.java
@@ -17,9 +17,19 @@
 
 package org.apache.nifi.provenance;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -34,19 +44,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordWriter;
-import org.apache.nifi.provenance.toc.StandardTocReader;
-import org.apache.nifi.provenance.toc.StandardTocWriter;
-import org.apache.nifi.provenance.toc.TocReader;
-import org.apache.nifi.provenance.toc.TocUtil;
-import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter {
     private final AtomicLong idGenerator = new AtomicLong(0L);
@@ -88,7 +88,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
         final ProvenanceEventRecord record = builder.build();
 
         writer.writeHeader(1L);
-        writer.writeRecord(record);
+        writer.writeRecords(Collections.singletonList(record));
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
@@ -146,7 +146,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
         final ProvenanceEventRecord record = builder.build();
 
         writer.writeHeader(1L);
-        writer.writeRecord(record);
+        writer.writeRecords(Collections.singletonList(record));
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
@@ -203,7 +203,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
         final ProvenanceEventRecord record = builder.build();
 
         writer.writeHeader(1L);
-        writer.writeRecord(record);
+        writer.writeRecords(Collections.singletonList(record));
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
@@ -261,7 +261,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
         final ProvenanceEventRecord record = builder.build();
 
         writer.writeHeader(1L);
-        writer.writeRecord(record);
+        writer.writeRecords(Collections.singletonList(record));
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
@@ -322,7 +322,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
         final ProvenanceEventRecord record = builder.build();
 
         writer.writeHeader(500_000L);
-        writer.writeRecord(record);
+        writer.writeRecords(Collections.singletonList(record));
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
@@ -382,12 +382,12 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
         builder.setCurrentContentClaim("container-2", "section-2", "identifier-2", 2L, 2L);
 
         writer.writeHeader(500_000L);
-        writer.writeRecord(builder.build());
+        writer.writeRecords(Collections.singletonList(builder.build()));
 
         builder.setEventId(1_000_001L);
         builder.setComponentId("4444");
         builder.setComponentType("unit-test-component-1");
-        writer.writeRecord(builder.build());
+        writer.writeRecords(Collections.singletonList(builder.build()));
 
         writer.close();
 
@@ -435,7 +435,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
             writer.writeHeader(0L);
 
             for (int i = 0; i < 100_000; i++) {
-                writer.writeRecord(createEvent());
+                writer.writeRecords(Collections.singletonList(createEvent()));
             }
         }