You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2020/01/10 18:48:33 UTC

[nifi] branch master updated: NIFI-3833 Added encrypted flowfile repository implementation. Added EncryptedSchemaRepositoryRecordSerde. Refactored CryptoUtils utility methods for repository encryption configuration validation checks to RepositoryEncryptorUtils. Added FlowFile repo encryption config container. Added more logging in cryptographic and serialization operations. Generalized log messages in shared encryption services. Added encrypted serde factory. Added marker impl for encrypted WAL. Moved validation of FF [...]

This is an automated email from the ASF dual-hosted git repository.

alopresto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cc467e  NIFI-3833 Added encrypted flowfile repository implementation. Added EncryptedSchemaRepositoryRecordSerde. Refactored CryptoUtils utility methods for repository encryption configuration validation checks to RepositoryEncryptorUtils. Added FlowFile repo encryption config container. Added more logging in cryptographic and serialization operations. Generalized log messages in shared encryption services. Added encrypted serde factory. Added marker impl for encrypted WAL. Move [...]
2cc467e is described below

commit 2cc467eb587247f1e8e0f02285ee01acc4ec9918
Author: Andy LoPresto <al...@apache.org>
AuthorDate: Wed Oct 23 15:18:25 2019 -0700

    NIFI-3833 Added encrypted flowfile repository implementation.
    Added EncryptedSchemaRepositoryRecordSerde.
    Refactored CryptoUtils utility methods for repository encryption configuration validation checks to RepositoryEncryptorUtils.
    Added FlowFile repo encryption config container.
    Added more logging in cryptographic and serialization operations.
    Generalized log messages in shared encryption services.
    Added encrypted serde factory.
    Added marker impl for encrypted WAL.
    Moved validation of FF repo encryption config earlier in startup process.
    Refactored duplicate property lookup code in NiFiProperties.
    Added title case string helper.
    Added validation and warning around misformatted encryption repo properties.
    Added unit tests.
    Added documentation to User Guide & Admin Guide.
    Added screenshot for docs.
    Added links to relevant sections of NiFi In-Depth doc to User Guide.
    Added flowfile & content repository encryption configuration properties to default nifi.properties.
    
    Signed-off-by: Joe Witt <jo...@apache.org>
    Signed-off-by: Mark Payne <ma...@hotmail.com>
    
    This closes #3968.
---
 .../java/org/apache/nifi/util/NiFiProperties.java  | 187 ++++++++---
 .../java/org/apache/nifi/util/StringUtils.java     |  27 ++
 .../java/org/apache/nifi/util/StringUtilsTest.java |  20 +-
 .../org/apache/nifi/security/kms/CryptoUtils.java  |  93 ++----
 .../nifi/security/kms/EncryptionException.java     |   3 +
 .../nifi/security/kms/KeyProviderFactory.java      |  12 +-
 .../repository/RepositoryEncryptorUtils.java       | 208 ++++++++++++-
 .../block/aes/RepositoryObjectAESGCMEncryptor.java |  13 +-
 .../FlowFileRepositoryEncryptionConfiguration.java |  67 ++++
 .../config/RepositoryEncryptionConfiguration.java  |   1 +
 .../repository/RepositoryEncryptorUtilsTest.groovy | 152 +++++++++
 .../nifi/wali/ByteArrayDataOutputStream.java       |   4 +-
 .../nifi/wali/SequentialAccessWriteAheadLog.java   |  19 +-
 .../src/main/asciidoc/administration-guide.adoc    |  32 +-
 .../asciidoc/images/encrypted-flowfile-hex.png     | Bin 0 -> 1974311 bytes
 nifi-docs/src/main/asciidoc/user-guide.adoc        | 105 ++++++-
 .../nifi-flowfile-repo-serialization/pom.xml       |  23 ++
 .../EncryptedRepositoryRecordSerdeFactory.java     |  68 ++++
 .../EncryptedSchemaRepositoryRecordSerde.java      | 344 +++++++++++++++++++++
 .../StandardRepositoryRecordSerdeFactory.java      |   3 +-
 .../WriteAheadRepositoryRecordSerde.java           |  10 +-
 ...ncryptedRepositoryRecordSerdeFactoryTest.groovy | 156 ++++++++++
 ...EncryptedSchemaRepositoryRecordSerdeTest.groovy | 317 +++++++++++++++++++
 .../src/test/resources/logback-test.xml            |  48 +++
 .../nifi-framework/nifi-framework-core/pom.xml     |   6 +
 .../org/apache/nifi/controller/FlowController.java | 140 ++++-----
 .../repository/WriteAheadFlowFileRepository.java   | 113 ++++---
 .../crypto/EncryptedFileSystemRepository.java      |  56 +---
 .../EncryptedSequentialAccessWriteAheadLog.java    |  52 ++++
 ...cryptedSequentialAccessWriteAheadLogTest.groovy | 266 ++++++++++++++++
 .../StandardNiFiPropertiesGroovyTest.groovy        | 219 +++++++++++++
 .../nifi-framework/nifi-resources/pom.xml          |   8 +
 .../src/main/resources/conf/nifi.properties        |   8 +
 .../nifi/provenance/RepositoryConfiguration.java   |   2 +
 .../provenance/serialization/RecordReaders.java    |   5 +-
 35 files changed, 2466 insertions(+), 321 deletions(-)

diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 38a2618..4c6a354 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -34,6 +34,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The NiFiProperties class holds all properties which are needed for various
@@ -44,6 +46,7 @@ import java.util.stream.Stream;
  * over time.
  */
 public abstract class NiFiProperties {
+    private static final Logger logger = LoggerFactory.getLogger(NiFiProperties.class);
 
     // core properties
     public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path";
@@ -100,10 +103,15 @@ public abstract class NiFiProperties {
 
     // flowfile repository properties
     public static final String FLOWFILE_REPOSITORY_IMPLEMENTATION = "nifi.flowfile.repository.implementation";
+    public static final String FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION = "nifi.flowfile.repository.wal.implementation";
     public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = "nifi.flowfile.repository.always.sync";
     public static final String FLOWFILE_REPOSITORY_DIRECTORY = "nifi.flowfile.repository.directory";
     public static final String FLOWFILE_REPOSITORY_PARTITIONS = "nifi.flowfile.repository.partitions";
     public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval";
+    public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY = "nifi.flowfile.repository.encryption.key";
+    public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID = "nifi.flowfile.repository.encryption.key.id";
+    public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS = "nifi.flowfile.repository.encryption.key.provider.implementation";
+    public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION = "nifi.flowfile.repository.encryption.key.provider.location";
     public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation";
     public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold";
     public static final String SWAP_IN_THREADS = "nifi.swap.in.threads";
@@ -246,7 +254,7 @@ public abstract class NiFiProperties {
     public static final String ANALYTICS_PREDICTION_INTERVAL = "nifi.analytics.predict.interval";
     public static final String ANALYTICS_QUERY_INTERVAL = "nifi.analytics.query.interval";
     public static final String ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = "nifi.analytics.connection.model.implementation";
-    public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME= "nifi.analytics.connection.model.score.name";
+    public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME = "nifi.analytics.connection.model.score.name";
     public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold";
 
     // defaults
@@ -977,6 +985,7 @@ public abstract class NiFiProperties {
      * Returns the claim to be used to identify a user.
      * Claim must be requested by adding the scope for it.
      * Default is 'email'.
+     *
      * @return The claim to be used to identify the user.
      */
     public String getOidcClaimIdentifyingUser() {
@@ -1321,6 +1330,138 @@ public abstract class NiFiProperties {
         return getPropertyKeys().size();
     }
 
+    public String getFlowFileRepoEncryptionKeyId() {
+        return getProperty(FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID);
+    }
+
+    /**
+     * Returns the active flowfile repository encryption key if a {@code StaticKeyProvider} is in use.
+     * If no key ID is specified in the properties file, the default
+     * {@code nifi.flowfile.repository.encryption.key} value is returned. If a key ID is specified in
+     * {@code nifi.flowfile.repository.encryption.key.id}, it will attempt to read from
+     * {@code nifi.flowfile.repository.encryption.key.id.XYZ} where {@code XYZ} is the provided key
+     * ID. If that value is empty, it will use the default property
+     * {@code nifi.flowfile.repository.encryption.key}.
+     *
+     * @return the flowfile repository encryption key in hex form
+     */
+    public String getFlowFileRepoEncryptionKey() {
+        String keyId = getFlowFileRepoEncryptionKeyId();
+        String keyKey = StringUtils.isBlank(keyId) ? FLOWFILE_REPOSITORY_ENCRYPTION_KEY : FLOWFILE_REPOSITORY_ENCRYPTION_KEY + ".id." + keyId;
+        return getProperty(keyKey, getProperty(FLOWFILE_REPOSITORY_ENCRYPTION_KEY));
+    }
+
+    /**
+     * Returns a map of keyId -> key in hex loaded from the {@code nifi.properties} file if a
+     * {@code StaticKeyProvider} is defined. If {@code FileBasedKeyProvider} is defined, use
+     * {@code CryptoUtils#readKeys()} instead -- this method will return an empty map.
+     *
+     * @return a Map of the keys identified by key ID
+     */
+    public Map<String, String> getFlowFileRepoEncryptionKeys() {
+        return getRepositoryEncryptionKeys("flowfile");
+    }
+
+    /**
+     * Returns the map of key IDs to keys retrieved from the properties for the given repository type.
+     *
+     * @param repositoryType "provenance", "content", or "flowfile"
+     * @return the key map
+     */
+    private Map<String, String> getRepositoryEncryptionKeys(String repositoryType) {
+        Map<String, String> keys = new HashMap<>();
+        List<String> keyProperties = getRepositoryEncryptionKeyProperties(repositoryType);
+        if (keyProperties.size() == 0) {
+            logger.warn("No " + repositoryType + " repository encryption key properties were available. Check the "
+                    + "exact format specified in the Admin Guide - Encrypted " + StringUtils.toTitleCase(repositoryType)
+                    + " Repository Properties");
+            return keys;
+        }
+        final String REPOSITORY_ENCRYPTION_KEY = getRepositoryEncryptionKey(repositoryType);
+        final String REPOSITORY_ENCRYPTION_KEY_ID = getRepositoryEncryptionKeyId(repositoryType);
+
+        // Retrieve the actual key values and store non-empty values in the map
+        for (String prop : keyProperties) {
+            logger.debug("Parsing " + prop);
+            final String value = getProperty(prop);
+            if (!StringUtils.isBlank(value)) {
+                // If this property is .key (the actual hex key), put it in the map under the value of .key.id (i.e. key1)
+                if (prop.equalsIgnoreCase(REPOSITORY_ENCRYPTION_KEY)) {
+                    keys.put(getProperty(REPOSITORY_ENCRYPTION_KEY_ID), value);
+                } else {
+                    // Extract nifi.*.repository.encryption.key.id.key1 -> key1
+                    String extractedKeyId = prop.substring(prop.lastIndexOf(".") + 1);
+                    if (keys.containsKey(extractedKeyId)) {
+                        logger.warn("The {} repository encryption key map already contains an entry for {}. Ignoring new value from {}", repositoryType, extractedKeyId, prop);
+                    } else {
+                        keys.put(extractedKeyId, value);
+                    }
+                }
+            }
+        }
+        return keys;
+    }
+
+    /**
+     * Returns the list of encryption key properties for the specified repository type. If an unknown repository type
+     * is provided, returns an empty list.
+     *
+     * @param repositoryType "provenance", "content", or "flowfile"
+     * @return the list of encryption key properties
+     */
+    private List<String> getRepositoryEncryptionKeyProperties(String repositoryType) {
+        switch (repositoryType.toLowerCase()) {
+            case "flowfile":
+                return getFlowFileRepositoryEncryptionKeyProperties();
+            case "content":
+                return getContentRepositoryEncryptionKeyProperties();
+            case "provenance":
+                return getProvenanceRepositoryEncryptionKeyProperties();
+            default:
+                return Collections.emptyList();
+        }
+    }
+
+    /**
+     * Returns the encryption key property key for the specified repository type. If an unknown repository type
+     * is provided, returns an empty string.
+     *
+     * @param repositoryType "provenance", "content", or "flowfile"
+     * @return the encryption key property (i.e. {@code FLOWFILE_REPOSITORY_ENCRYPTION_KEY})
+     */
+    private String getRepositoryEncryptionKey(String repositoryType) {
+        switch (repositoryType.toLowerCase()) {
+            case "flowfile":
+                return FLOWFILE_REPOSITORY_ENCRYPTION_KEY;
+            case "content":
+                return CONTENT_REPOSITORY_ENCRYPTION_KEY;
+            case "provenance":
+                return PROVENANCE_REPO_ENCRYPTION_KEY;
+            default:
+                return "";
+        }
+    }
+
+    /**
+     * Returns the encryption key ID property key for the specified repository type. If an unknown repository type
+     * is provided, returns an empty string.
+     *
+     * @param repositoryType "provenance", "content", or "flowfile"
+     * @return the encryption key ID property (i.e. {@code FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID})
+     */
+    private String getRepositoryEncryptionKeyId(String repositoryType) {
+        switch (repositoryType.toLowerCase()) {
+            case "flowfile":
+                return FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID;
+            case "content":
+                return CONTENT_REPOSITORY_ENCRYPTION_KEY_ID;
+            case "provenance":
+                return PROVENANCE_REPO_ENCRYPTION_KEY_ID;
+            default:
+                return "";
+        }
+    }
+
     public String getProvenanceRepoEncryptionKeyId() {
         return getProperty(PROVENANCE_REPO_ENCRYPTION_KEY_ID);
     }
@@ -1350,23 +1491,7 @@ public abstract class NiFiProperties {
      * @return a Map of the keys identified by key ID
      */
     public Map<String, String> getProvenanceRepoEncryptionKeys() {
-        Map<String, String> keys = new HashMap<>();
-        List<String> keyProperties = getProvenanceRepositoryEncryptionKeyProperties();
-
-        // Retrieve the actual key values and store non-empty values in the map
-        for (String prop : keyProperties) {
-            final String value = getProperty(prop);
-            if (!StringUtils.isBlank(value)) {
-                if (prop.equalsIgnoreCase(PROVENANCE_REPO_ENCRYPTION_KEY)) {
-                    prop = getProvenanceRepoEncryptionKeyId();
-                } else {
-                    // Extract nifi.provenance.repository.encryption.key.id.key1 -> key1
-                    prop = prop.substring(prop.lastIndexOf(".") + 1);
-                }
-                keys.put(prop, value);
-            }
-        }
-        return keys;
+        return getRepositoryEncryptionKeys("provenance");
     }
 
     public String getContentRepositoryEncryptionKeyId() {
@@ -1398,24 +1523,7 @@ public abstract class NiFiProperties {
      * @return a Map of the keys identified by key ID
      */
     public Map<String, String> getContentRepositoryEncryptionKeys() {
-        // TODO: Duplicate logic with different constants as provenance should be refactored to helper method
-        Map<String, String> keys = new HashMap<>();
-        List<String> keyProperties = getContentRepositoryEncryptionKeyProperties();
-
-        // Retrieve the actual key values and store non-empty values in the map
-        for (String prop : keyProperties) {
-            final String value = getProperty(prop);
-            if (!StringUtils.isBlank(value)) {
-                if (prop.equalsIgnoreCase(CONTENT_REPOSITORY_ENCRYPTION_KEY)) {
-                    prop = getContentRepositoryEncryptionKeyId();
-                } else {
-                    // Extract nifi.content.repository.encryption.key.id.key1 -> key1
-                    prop = prop.substring(prop.lastIndexOf(".") + 1);
-                }
-                keys.put(prop, value);
-            }
-        }
-        return keys;
+        return getRepositoryEncryptionKeys("content");
     }
 
     /**
@@ -1486,6 +1594,13 @@ public abstract class NiFiProperties {
         }
     }
 
+    private List<String> getFlowFileRepositoryEncryptionKeyProperties() {
+        // Filter all the property keys that define a key
+        return getPropertyKeys().stream().filter(k ->
+                k.startsWith(FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID + ".") || k.equalsIgnoreCase(FLOWFILE_REPOSITORY_ENCRYPTION_KEY)
+        ).collect(Collectors.toList());
+    }
+
     private List<String> getProvenanceRepositoryEncryptionKeyProperties() {
         // Filter all the property keys that define a key
         return getPropertyKeys().stream().filter(k ->
diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
index 87f12a2..5418812 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
@@ -16,7 +16,10 @@
  */
 package org.apache.nifi.util;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * String Utils based on the Apache Commons Lang String Utils.
@@ -469,4 +472,28 @@ public class StringUtils {
         return str;
     }
 
+    /**
+     * Returns the string in "title case" (i.e. every word capitalized). If the input is {@code null} or blank, returns
+     * an empty string. Leading and trailing spaces are trimmed, and multiple internal spaces are condensed.
+     *
+     * Examples:
+     *
+     * this is a sentence -> This Is A Sentence
+     * allOneWord -> Alloneword
+     * PREVIOUSLY UPPERCASE -> Previously Uppercase
+     * multiple   spaces -> Multiple Spaces
+     *
+     * @param input the input string
+     * @return the titlecased string
+     */
+    public static String toTitleCase(String input) {
+        if (input == null || isBlank(input)) {
+            return "";
+        }
+        List<String> elements = Arrays.asList(input.trim().toLowerCase().split("\\s"));
+        return elements.stream()
+                .filter(word -> !StringUtils.isBlank(word))
+                .map(word -> Character.toTitleCase(word.charAt(0)) + word.substring(1))
+                .collect(Collectors.joining(" "));
+    }
 }
diff --git a/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/StringUtilsTest.java b/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/StringUtilsTest.java
index 1690bab..059857c 100644
--- a/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/StringUtilsTest.java
+++ b/nifi-commons/nifi-properties/src/test/java/org/apache/nifi/util/StringUtilsTest.java
@@ -16,12 +16,15 @@
  */
 package org.apache.nifi.util;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.junit.Test;
 
 public class StringUtilsTest {
@@ -106,4 +109,17 @@ public class StringUtilsTest {
     collection.add(null);
     assertEquals("test1,test2,null", StringUtils.join(collection, ","));
   }
+
+  @Test
+  public void testShouldTitleCaseStrings() {
+    // Arrange
+    List<String> inputs = Arrays.asList(null, "", "  leading space", "trailing space  ", "multiple   spaces", "this is a sentence", "allOneWord", "PREVIOUSLY UPPERCASE");
+    List<String> expected = Arrays.asList("", "", "Leading Space", "Trailing Space", "Multiple Spaces", "This Is A Sentence", "Alloneword", "Previously Uppercase");
+
+    // Act
+    List<String> titleCased = inputs.stream().map(StringUtils::toTitleCase).collect(Collectors.toList());
+
+    // Assert
+    assertEquals(titleCased, expected);
+  }
 }
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/CryptoUtils.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/CryptoUtils.java
index 0f3a19c..b4cecb4 100644
--- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/CryptoUtils.java
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/CryptoUtils.java
@@ -43,6 +43,7 @@ import javax.crypto.IllegalBlockSizeException;
 import javax.crypto.SecretKey;
 import javax.crypto.spec.SecretKeySpec;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.security.repository.config.RepositoryEncryptionConfiguration;
 import org.apache.nifi.security.util.EncryptionMethod;
 import org.apache.nifi.security.util.crypto.AESKeyedCipherProvider;
 import org.apache.nifi.util.NiFiProperties;
@@ -52,11 +53,12 @@ import org.slf4j.LoggerFactory;
 
 public class CryptoUtils {
     private static final Logger logger = LoggerFactory.getLogger(CryptoUtils.class);
-    private static final String STATIC_KEY_PROVIDER_CLASS_NAME = "org.apache.nifi.security.kms.StaticKeyProvider";
-    private static final String FILE_BASED_KEY_PROVIDER_CLASS_NAME = "org.apache.nifi.security.kms.FileBasedKeyProvider";
+    public static final String STATIC_KEY_PROVIDER_CLASS_NAME = "org.apache.nifi.security.kms.StaticKeyProvider";
+    public static final String FILE_BASED_KEY_PROVIDER_CLASS_NAME = "org.apache.nifi.security.kms.FileBasedKeyProvider";
 
-    private static final String LEGACY_SKP_FQCN = "org.apache.nifi.provenance.StaticKeyProvider";
-    private static final String LEGACY_FBKP_FQCN = "org.apache.nifi.provenance.FileBasedKeyProvider";
+    // TODO: Move to RepositoryEncryptionUtils in NIFI-6617
+    public static final String LEGACY_SKP_FQCN = "org.apache.nifi.provenance.StaticKeyProvider";
+    public static final String LEGACY_FBKP_FQCN = "org.apache.nifi.provenance.FileBasedKeyProvider";
 
     private static final String RELATIVE_NIFI_PROPS_PATH = "conf/nifi.properties";
     private static final String BOOTSTRAP_KEY_PREFIX = "nifi.bootstrap.sensitive.key=";
@@ -67,7 +69,8 @@ public class CryptoUtils {
     private static final List<Integer> UNLIMITED_KEY_LENGTHS = Arrays.asList(32, 48, 64);
 
     public static final int IV_LENGTH = 16;
-    private static final String ENCRYPTED_FSR_CLASS_NAME = "org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository";
+    public static final String ENCRYPTED_FSR_CLASS_NAME = "org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository";
+    public static final String EWAFFR_CLASS_NAME = "org.apache.nifi.controller.repository.crypto.EncryptedWriteAheadFlowFileRepository";
 
     public static boolean isUnlimitedStrengthCryptoAvailable() {
         try {
@@ -110,6 +113,18 @@ public class CryptoUtils {
     }
 
     /**
+     * Returns true if the provided configuration values are valid (shallow evaluation only; does not validate the keys
+     * contained in a {@link FileBasedKeyProvider}).
+     *
+     * @param rec the configuration to validate
+     * @return true if the config is valid
+     */
+    public static boolean isValidRepositoryEncryptionConfiguration(RepositoryEncryptionConfiguration rec) {
+        return isValidKeyProvider(rec.getKeyProviderImplementation(), rec.getKeyProviderLocation(), rec.getEncryptionKeyId(), rec.getEncryptionKeys());
+
+    }
+
+    /**
      * Returns true if the provided configuration values successfully define the specified {@link KeyProvider}.
      *
      * @param keyProviderImplementation the FQ class name of the {@link KeyProvider} implementation
@@ -120,18 +135,18 @@ public class CryptoUtils {
      */
     public static boolean isValidKeyProvider(String keyProviderImplementation, String keyProviderLocation, String keyId, Map<String, String> encryptionKeys) {
         logger.debug("Attempting to validate the key provider: keyProviderImplementation = "
-                + keyProviderImplementation + " , keyProviderLocation = "
-                + keyProviderLocation + " , keyId = "
-                + keyId + " , encryptionKeys = "
+                + keyProviderImplementation + ", keyProviderLocation = "
+                + keyProviderLocation + ", keyId = "
+                + keyId + ", encryptionKeys = "
                 + ((encryptionKeys == null) ? "0" : encryptionKeys.size()));
 
         try {
             keyProviderImplementation = handleLegacyPackages(keyProviderImplementation);
         } catch (KeyManagementException e) {
-            logger.error("The attempt to validate the key provider failed keyProviderImplementation = "
-                    + keyProviderImplementation + " , keyProviderLocation = "
-                    + keyProviderLocation + " , keyId = "
-                    + keyId + " , encryptionKeys = "
+            logger.warn("The attempt to validate the key provider failed keyProviderImplementation = "
+                    + keyProviderImplementation + ", keyProviderLocation = "
+                    + keyProviderLocation + ", keyId = "
+                    + keyId + ", encryptionKeys = "
                     + ((encryptionKeys == null) ? "0" : encryptionKeys.size()));
 
             return false;
@@ -150,10 +165,10 @@ public class CryptoUtils {
             final File kpf = new File(keyProviderLocation);
             return kpf.exists() && kpf.canRead() && StringUtils.isNotEmpty(keyId);
         } else {
-            logger.error("The attempt to validate the key provider failed keyProviderImplementation = "
-                    + keyProviderImplementation + " , keyProviderLocation = "
-                    + keyProviderLocation + " , keyId = "
-                    + keyId + " , encryptionKeys = "
+            logger.warn("The attempt to validate the key provider failed keyProviderImplementation = "
+                    + keyProviderImplementation + ", keyProviderLocation = "
+                    + keyProviderLocation + ", keyId = "
+                    + keyId + ", encryptionKeys = "
                     + ((encryptionKeys == null) ? "0" : encryptionKeys.size()));
 
             return false;
@@ -287,52 +302,6 @@ public class CryptoUtils {
     }
 
     /**
-     * Returns {@code true} if the provenance repository is correctly configured for an
-     * encrypted implementation. Requires the repository implementation to support encryption
-     * and at least one valid key to be configured.
-     *
-     * @param niFiProperties the {@link NiFiProperties} instance to validate
-     * @return true if encryption is successfully configured for the provenance repository
-     */
-    public static boolean isProvenanceRepositoryEncryptionConfigured(NiFiProperties niFiProperties) {
-        final String implementationClassName = niFiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
-        // Referencing EWAPR.class.getName() would require a dependency on the module
-        boolean encryptedRepo = "org.apache.nifi.provenance.EncryptedWriteAheadProvenanceRepository".equals(implementationClassName);
-        if (encryptedRepo) {
-            return isValidKeyProvider(
-                    niFiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS),
-                    niFiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION),
-                    niFiProperties.getProvenanceRepoEncryptionKeyId(),
-                    niFiProperties.getProvenanceRepoEncryptionKeys());
-        } else {
-            return false;
-        }
-    }
-
-    /**
-     * Returns {@code true} if the content repository is correctly configured for an encrypted
-     * implementation. Requires the repository implementation to support encryption and at least
-     * one valid key to be configured.
-     *
-     * @param niFiProperties the {@link NiFiProperties} instance to validate
-     * @return true if encryption is successfully configured for the content repository
-     */
-    public static boolean isContentRepositoryEncryptionConfigured(NiFiProperties niFiProperties) {
-        final String implementationClassName = niFiProperties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
-        // Referencing EFSR.class.getName() would require a dependency on the module
-        boolean encryptedRepo = ENCRYPTED_FSR_CLASS_NAME.equals(implementationClassName);
-        if (encryptedRepo) {
-            return isValidKeyProvider(
-                    niFiProperties.getProperty(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS),
-                    niFiProperties.getProperty(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION),
-                    niFiProperties.getContentRepositoryEncryptionKeyId(),
-                    niFiProperties.getContentRepositoryEncryptionKeys());
-        } else {
-            return false;
-        }
-    }
-
-    /**
      * Returns the master key from the {@code bootstrap.conf} file used to encrypt various sensitive properties and data encryption keys.
      *
      * @return the master key
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/EncryptionException.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/EncryptionException.java
index 9e851c2..e4635b5 100644
--- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/EncryptionException.java
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/EncryptionException.java
@@ -18,6 +18,9 @@ package org.apache.nifi.security.kms;
 
 import java.security.PrivilegedActionException;
 
+/**
+ * Class used to denote a problem configuring encryption services or encrypting/decrypting data.
+ */
 public class EncryptionException extends Exception {
     /**
      * Constructs a new exception with the specified detail message.  The
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/KeyProviderFactory.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/KeyProviderFactory.java
index 68a4e16..4fffc40 100644
--- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/KeyProviderFactory.java
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/kms/KeyProviderFactory.java
@@ -33,7 +33,7 @@ public class KeyProviderFactory {
     /**
      * Returns a key provider instantiated from the configuration values in a {@link RepositoryEncryptionConfiguration} object.
      *
-     * @param rec the data container for config values (usually extracted from {@link org.apache.nifi.util.NiFiProperties})
+     * @param rec       the data container for config values (usually extracted from {@link org.apache.nifi.util.NiFiProperties})
      * @param masterKey the master key used to decrypt wrapped keys
      * @return the configured key provider
      * @throws KeyManagementException if the key provider cannot be instantiated
@@ -49,10 +49,10 @@ public class KeyProviderFactory {
      * Returns a key provider instantiated from the configuration values in a {@link RepositoryEncryptionConfiguration} object.
      *
      * @param implementationClassName the key provider class name
-     * @param keyProviderLocation the filepath/URL of the stored keys
-     * @param keyId the active key id
-     * @param encryptionKeys the available encryption keys
-     * @param masterKey the master key used to decrypt wrapped keys
+     * @param keyProviderLocation     the filepath/URL of the stored keys
+     * @param keyId                   the active key id
+     * @param encryptionKeys          the available encryption keys
+     * @param masterKey               the master key used to decrypt wrapped keys
      * @return the configured key provider
      * @throws KeyManagementException if the key provider cannot be instantiated
      */
@@ -60,7 +60,7 @@ public class KeyProviderFactory {
                                                SecretKey masterKey) throws KeyManagementException {
         KeyProvider keyProvider;
 
-       implementationClassName = CryptoUtils.handleLegacyPackages(implementationClassName);
+        implementationClassName = CryptoUtils.handleLegacyPackages(implementationClassName);
 
         if (StaticKeyProvider.class.getName().equals(implementationClassName)) {
             // Get all the keys (map) from config
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/RepositoryEncryptorUtils.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/RepositoryEncryptorUtils.java
index 4223884..d3f1af1 100644
--- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/RepositoryEncryptorUtils.java
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/RepositoryEncryptorUtils.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.security.repository;
 
+import static org.apache.nifi.security.kms.CryptoUtils.isValidKeyProvider;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -27,6 +29,7 @@ import java.util.Arrays;
 import java.util.List;
 import javax.crypto.Cipher;
 import javax.crypto.SecretKey;
+import org.apache.nifi.security.kms.CryptoUtils;
 import org.apache.nifi.security.kms.EncryptionException;
 import org.apache.nifi.security.kms.KeyProvider;
 import org.apache.nifi.security.kms.KeyProviderFactory;
@@ -48,6 +51,7 @@ public class RepositoryEncryptorUtils {
     private static final List<String> SUPPORTED_VERSIONS = Arrays.asList(VERSION);
     private static final int MIN_METADATA_LENGTH = IV_LENGTH + 3 + 3; // 3 delimiters and 3 non-zero elements
     private static final int METADATA_DEFAULT_LENGTH = (20 + 17 + IV_LENGTH + VERSION.length()) * 2; // Default to twice the expected length
+    private static final String EWAPR_CLASS_NAME = "org.apache.nifi.provenance.EncryptedWriteAheadProvenanceRepository";
 
     // TODO: Add Javadoc
 
@@ -108,14 +112,216 @@ public class RepositoryEncryptorUtils {
         return Arrays.copyOfRange(encryptedRecord, cipherBytesStart, encryptedRecord.length);
     }
 
+    /**
+     * Returns {@code true} if the specified repository is correctly configured for an
+     * encrypted implementation. Requires the repository implementation to support encryption
+     * and at least one valid key to be configured.
+     *
+     * @param niFiProperties the {@link NiFiProperties} instance to validate
+     * @param repositoryType the specific repository configuration to check
+     * @return true if encryption is successfully configured for the specified repository
+     */
+    public static boolean isRepositoryEncryptionConfigured(NiFiProperties niFiProperties, RepositoryType repositoryType) {
+        switch (repositoryType) {
+            case CONTENT:
+                return isContentRepositoryEncryptionConfigured(niFiProperties);
+            case PROVENANCE:
+                return isProvenanceRepositoryEncryptionConfigured(niFiProperties);
+            case FLOWFILE:
+                return isFlowFileRepositoryEncryptionConfigured(niFiProperties);
+            default:
+                logger.warn("Repository encryption configuration validation attempted for {}, an invalid repository type", repositoryType);
+                return false;
+        }
+    }
+
+    /**
+     * Returns {@code true} if the provenance repository is correctly configured for an
+     * encrypted implementation. Requires the repository implementation to support encryption
+     * and at least one valid key to be configured.
+     *
+     * @param niFiProperties the {@link NiFiProperties} instance to validate
+     * @return true if encryption is successfully configured for the provenance repository
+     */
+    static boolean isProvenanceRepositoryEncryptionConfigured(NiFiProperties niFiProperties) {
+        final String implementationClassName = niFiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
+        // Referencing EWAPR.class.getName() would require a dependency on the module
+        boolean encryptedRepo = EWAPR_CLASS_NAME.equals(implementationClassName);
+        if (encryptedRepo) {
+            return isValidKeyProvider(
+                    niFiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS),
+                    niFiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION),
+                    niFiProperties.getProvenanceRepoEncryptionKeyId(),
+                    niFiProperties.getProvenanceRepoEncryptionKeys());
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Returns {@code true} if the content repository is correctly configured for an encrypted
+     * implementation. Requires the repository implementation to support encryption and at least
+     * one valid key to be configured.
+     *
+     * @param niFiProperties the {@link NiFiProperties} instance to validate
+     * @return true if encryption is successfully configured for the content repository
+     */
+    static boolean isContentRepositoryEncryptionConfigured(NiFiProperties niFiProperties) {
+        final String implementationClassName = niFiProperties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
+        // Referencing EFSR.class.getName() would require a dependency on the module
+        boolean encryptedRepo = CryptoUtils.ENCRYPTED_FSR_CLASS_NAME.equals(implementationClassName);
+        if (encryptedRepo) {
+            return isValidKeyProvider(
+                    niFiProperties.getProperty(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS),
+                    niFiProperties.getProperty(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION),
+                    niFiProperties.getContentRepositoryEncryptionKeyId(),
+                    niFiProperties.getContentRepositoryEncryptionKeys());
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Returns {@code true} if the flowfile repository is correctly configured for an encrypted
+     * implementation. Requires the repository implementation to support encryption and at least
+     * one valid key to be configured.
+     *
+     * @param niFiProperties the {@link NiFiProperties} instance to validate
+     * @return true if encryption is successfully configured for the flowfile repository
+     */
+    static boolean isFlowFileRepositoryEncryptionConfigured(NiFiProperties niFiProperties) {
+        final String implementationClassName = niFiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
+        boolean encryptedRepo = CryptoUtils.EWAFFR_CLASS_NAME.equals(implementationClassName);
+        if (encryptedRepo) {
+            return isValidKeyProvider(
+                    niFiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS),
+                    niFiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION),
+                    niFiProperties.getFlowFileRepoEncryptionKeyId(),
+                    niFiProperties.getFlowFileRepoEncryptionKeys());
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Returns a configured {@link KeyProvider} instance that does not require a {@code master key} to use (usually a {@link org.apache.nifi.security.kms.StaticKeyProvider}).
+     *
+     * @param niFiProperties the {@link NiFiProperties} object
+     * @param repositoryType the {@link RepositoryType} indicator
+     * @return the configured KeyProvider
+     * @throws KeyManagementException if there is a problem with the configuration
+     */
+    private static KeyProvider buildKeyProvider(NiFiProperties niFiProperties, RepositoryType repositoryType) throws KeyManagementException {
+        return buildKeyProvider(niFiProperties, null, repositoryType);
+    }
+
+    /**
+     * Returns a configured {@link KeyProvider} instance that requires a {@code master key} to use
+     * (usually a {@link org.apache.nifi.security.kms.FileBasedKeyProvider} or an encrypted
+     * {@link org.apache.nifi.security.kms.StaticKeyProvider}).
+     *
+     * @param niFiProperties the {@link NiFiProperties} object
+     * @param masterKey      the master encryption key used to encrypt the data encryption keys in the key provider configuration
+     * @param repositoryType the {@link RepositoryType} indicator
+     * @return the configured KeyProvider
+     * @throws KeyManagementException if there is a problem with the configuration
+     */
     public static KeyProvider buildKeyProvider(NiFiProperties niFiProperties, SecretKey masterKey, RepositoryType repositoryType) throws KeyManagementException {
         RepositoryEncryptionConfiguration rec = RepositoryEncryptionConfiguration.fromNiFiProperties(niFiProperties, repositoryType);
 
+        return buildKeyProviderFromConfig(masterKey, rec);
+    }
+
+    /**
+     * Returns a configured {@link KeyProvider} instance given the {@link RepositoryEncryptionConfiguration}.
+     *
+     * @param masterKey the master encryption key used to encrypt the data encryption keys in the key provider configuration
+     * @param rec       the repository-specific encryption configuration
+     * @return the configured KeyProvider
+     * @throws KeyManagementException if there is a problem with the configuration
+     */
+    public static KeyProvider buildKeyProviderFromConfig(SecretKey masterKey, RepositoryEncryptionConfiguration rec) throws KeyManagementException {
         if (rec.getKeyProviderImplementation() == null) {
+            final String keyProviderImplementationClass = determineKeyProviderImplementationClassName(rec.getRepositoryType());
             throw new KeyManagementException("Cannot create key provider because the NiFi properties are missing the following property: "
-                    + NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS);
+                    + keyProviderImplementationClass);
         }
 
         return KeyProviderFactory.buildKeyProvider(rec, masterKey);
     }
+
+    /**
+     * Utility method which returns the {@link KeyProvider} implementation class name for a given repository type.
+     *
+     * @param repositoryType the {@link RepositoryType} indicator
+     * @return the FQCN of the implementation or {@code "no_such_key_provider_defined"} for unsupported repository types
+     */
+    static String determineKeyProviderImplementationClassName(RepositoryType repositoryType) {
+        // TODO: Change to build string directly using repository type packagePath property or universal in NIFI-6617
+        if (repositoryType == null) {
+            logger.warn("Could not determine key provider implementation class name for null repository");
+            return "no_such_key_provider_defined";
+        }
+        switch (repositoryType) {
+            case FLOWFILE:
+                return NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS;
+            case CONTENT:
+                return NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS;
+            case PROVENANCE:
+                return NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS;
+            default:
+                logger.warn("Could not determine key provider implementation class name for " + repositoryType.getName());
+                return "no_such_key_provider_defined";
+        }
+    }
+
+    /**
+     * Returns a configured {@link KeyProvider} instance for the specified repository type given the configuration values in {@code nifi.properties}.
+     *
+     * @param niFiProperties the {@link NiFiProperties} object
+     * @param repositoryType the {@link RepositoryType} indicator
+     * @return the configured KeyProvider
+     * @throws IOException if there is a problem reading the properties or they are not valid & complete
+     */
+    public static KeyProvider validateAndBuildRepositoryKeyProvider(NiFiProperties niFiProperties, RepositoryType repositoryType) throws IOException {
+        // Initialize the encryption-specific fields
+        if (isRepositoryEncryptionConfigured(niFiProperties, repositoryType)) {
+            try {
+                KeyProvider keyProvider;
+                final String keyProviderImplementation = niFiProperties.getProperty(determineKeyProviderImplementationClassName(repositoryType));
+                if (KeyProviderFactory.requiresMasterKey(keyProviderImplementation)) {
+                    SecretKey masterKey = CryptoUtils.getMasterKey();
+                    keyProvider = buildKeyProvider(niFiProperties, masterKey, repositoryType);
+                } else {
+                    keyProvider = buildKeyProvider(niFiProperties, repositoryType);
+                }
+                return keyProvider;
+            } catch (KeyManagementException e) {
+                String msg = "Encountered an error building the key provider";
+                logger.error(msg, e);
+                throw new IOException(msg, e);
+            }
+        } else {
+            throw new IOException("The provided configuration does not support an encrypted " + repositoryType.getName());
+        }
+    }
+
+    /**
+     * Returns a configured {@link KeyProvider} instance for the specified repository type given the configuration values.
+     *
+     * @param repositoryEncryptionConfiguration the {@link RepositoryEncryptionConfiguration} object
+     * @return the configured KeyProvider
+     * @throws IOException if there is a problem reading the properties or they are not valid & complete
+     */
+    public static KeyProvider validateAndBuildRepositoryKeyProvider(RepositoryEncryptionConfiguration repositoryEncryptionConfiguration) throws IOException {
+        // Initialize the encryption-specific fields
+        try {
+            SecretKey masterKey = KeyProviderFactory.requiresMasterKey(repositoryEncryptionConfiguration.getKeyProviderImplementation()) ? CryptoUtils.getMasterKey() : null;
+            return buildKeyProviderFromConfig(masterKey, repositoryEncryptionConfiguration);
+        } catch (KeyManagementException e) {
+            String msg = "Encountered an error building the key provider";
+            logger.error(msg, e);
+            throw new IOException(msg, e);
+        }
+    }
 }
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/block/aes/RepositoryObjectAESGCMEncryptor.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/block/aes/RepositoryObjectAESGCMEncryptor.java
index 080d543..17de402 100644
--- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/block/aes/RepositoryObjectAESGCMEncryptor.java
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/block/aes/RepositoryObjectAESGCMEncryptor.java
@@ -89,14 +89,15 @@ public class RepositoryObjectAESGCMEncryptor extends AbstractAESEncryptor implem
                 // Serialize and concat encryption details fields (keyId, algo, IV, version, CB length) outside of encryption
                 RepositoryObjectEncryptionMetadata metadata = new BlockEncryptionMetadata(keyId, ALGORITHM, ivBytes, VERSION, cipherBytes.length);
                 byte[] serializedEncryptionMetadata = RepositoryEncryptorUtils.serializeEncryptionMetadata(metadata);
+                logger.debug("Generated encryption metadata ({} bytes) for repository object {}", serializedEncryptionMetadata.length, recordId);
 
                 // Add the sentinel byte of 0x01
                 // TODO: Remove (required for prov repo but not FF repo)
-                logger.debug("Encrypted provenance event record " + recordId + " with key ID " + keyId);
+                logger.debug("Encrypted repository object " + recordId + " with key ID " + keyId);
                 // return CryptoUtils.concatByteArrays(SENTINEL, serializedEncryptionMetadata, cipherBytes);
                 return CryptoUtils.concatByteArrays(serializedEncryptionMetadata, cipherBytes);
             } catch (EncryptionException | BadPaddingException | IllegalBlockSizeException | IOException | KeyManagementException e) {
-                final String msg = "Encountered an exception encrypting provenance record " + recordId;
+                final String msg = "Encountered an exception encrypting repository object " + recordId;
                 logger.error(msg, e);
                 throw new EncryptionException(msg, e);
             }
@@ -113,7 +114,7 @@ public class RepositoryObjectAESGCMEncryptor extends AbstractAESEncryptor implem
      */
     @Override
     public byte[] decrypt(byte[] encryptedRecord, String recordId) throws EncryptionException {
-        RepositoryObjectEncryptionMetadata metadata = prepareObjectForDecryption(encryptedRecord, recordId, "provenance record", SUPPORTED_VERSIONS);
+        RepositoryObjectEncryptionMetadata metadata = prepareObjectForDecryption(encryptedRecord, recordId, "repository object", SUPPORTED_VERSIONS);
 
         // TODO: Actually use the version to determine schema, etc.
 
@@ -121,7 +122,7 @@ public class RepositoryObjectAESGCMEncryptor extends AbstractAESEncryptor implem
             throw new EncryptionException("The requested key ID " + metadata.keyId + " is not available");
         } else {
             try {
-                logger.debug("Decrypting provenance record " + recordId + " with key ID " + metadata.keyId);
+                logger.debug("Decrypting repository object " + recordId + " with key ID " + metadata.keyId);
                 EncryptionMethod method = EncryptionMethod.forAlgorithm(metadata.algorithm);
                 Cipher cipher = RepositoryEncryptorUtils.initCipher(aesKeyedCipherProvider, method, Cipher.DECRYPT_MODE, keyProvider.getKey(metadata.keyId), metadata.ivBytes);
 
@@ -131,10 +132,10 @@ public class RepositoryObjectAESGCMEncryptor extends AbstractAESEncryptor implem
                 // Perform the actual decryption
                 byte[] plainBytes = cipher.doFinal(cipherBytes);
 
-                logger.debug("Decrypted provenance event record " + recordId + " with key ID " + metadata.keyId);
+                logger.debug("Decrypted repository object " + recordId + " with key ID " + metadata.keyId);
                 return plainBytes;
             } catch (EncryptionException | BadPaddingException | IllegalBlockSizeException | KeyManagementException e) {
-                final String msg = "Encountered an exception decrypting provenance record " + recordId;
+                final String msg = "Encountered an exception decrypting repository object " + recordId;
                 logger.error(msg, e);
                 throw new EncryptionException(msg, e);
             }
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/config/FlowFileRepositoryEncryptionConfiguration.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/config/FlowFileRepositoryEncryptionConfiguration.java
new file mode 100644
index 0000000..74e4132
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/config/FlowFileRepositoryEncryptionConfiguration.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.repository.config;
+
+import java.util.Map;
+import org.apache.nifi.security.repository.RepositoryType;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlowFileRepositoryEncryptionConfiguration extends RepositoryEncryptionConfiguration {
+    private static final Logger logger = LoggerFactory.getLogger(FlowFileRepositoryEncryptionConfiguration.class);
+
+    /**
+     * Constructor which accepts a {@link NiFiProperties} object and extracts the relevant
+     * property values directly.
+     *
+     * @param niFiProperties the NiFi properties
+     */
+    public FlowFileRepositoryEncryptionConfiguration(NiFiProperties niFiProperties) {
+        this(niFiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS),
+                niFiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION),
+                niFiProperties.getFlowFileRepoEncryptionKeyId(),
+                niFiProperties.getFlowFileRepoEncryptionKeys(),
+                niFiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION)
+        );
+    }
+
+    /**
+     * Constructor which accepts explicit values for each configuration value. This differs
+     * from {@link ContentRepositoryEncryptionConfiguration} and {@link ProvenanceRepositoryEncryptionConfiguration} because the repository implementation
+     * does not change for an encrypted flowfile repository, only the write-ahead log
+     * implementation ({@link NiFiProperties#FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION}).
+     *
+     * @param keyProviderImplementation the key provider implementation class
+     * @param keyProviderLocation the key provider location
+     * @param encryptionKeyId the active encryption key id
+     * @param encryptionKeys the map of available keys
+     * @param repositoryImplementation the write ahead log implementation
+     */
+    public FlowFileRepositoryEncryptionConfiguration(String keyProviderImplementation,
+                                                     String keyProviderLocation,
+                                                     String encryptionKeyId,
+                                                     Map<String, String> encryptionKeys,
+                                                     String repositoryImplementation) {
+        this.keyProviderImplementation = keyProviderImplementation;
+        this.keyProviderLocation = keyProviderLocation;
+        this.encryptionKeyId = encryptionKeyId;
+        this.encryptionKeys = encryptionKeys;
+        this.repositoryImplementation = repositoryImplementation;
+        this.repositoryType = RepositoryType.FLOWFILE;
+    }
+}
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/config/RepositoryEncryptionConfiguration.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/config/RepositoryEncryptionConfiguration.java
index 250e96b..8f4de83 100644
--- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/config/RepositoryEncryptionConfiguration.java
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/repository/config/RepositoryEncryptionConfiguration.java
@@ -107,6 +107,7 @@ public abstract class RepositoryEncryptionConfiguration {
             case PROVENANCE:
                 return new ProvenanceRepositoryEncryptionConfiguration(niFiProperties);
             case FLOWFILE:
+                return new FlowFileRepositoryEncryptionConfiguration(niFiProperties);
             default:
                 throw new IllegalArgumentException("The specified repository does not support encryption");
         }
diff --git a/nifi-commons/nifi-security-utils/src/test/groovy/org/apache/nifi/security/repository/RepositoryEncryptorUtilsTest.groovy b/nifi-commons/nifi-security-utils/src/test/groovy/org/apache/nifi/security/repository/RepositoryEncryptorUtilsTest.groovy
new file mode 100644
index 0000000..7b8b67b
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/test/groovy/org/apache/nifi/security/repository/RepositoryEncryptorUtilsTest.groovy
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.repository
+
+
+import org.apache.nifi.security.kms.StaticKeyProvider
+import org.apache.nifi.security.util.EncryptionMethod
+import org.apache.nifi.security.util.crypto.AESKeyedCipherProvider
+import org.apache.nifi.util.NiFiProperties
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.bouncycastle.util.encoders.Hex
+import org.junit.After
+import org.junit.AfterClass
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import javax.crypto.Cipher
+import javax.crypto.SecretKey
+import javax.crypto.spec.IvParameterSpec
+import javax.crypto.spec.SecretKeySpec
+import java.security.Security
+
+@RunWith(JUnit4.class)
+class RepositoryEncryptorUtilsTest extends GroovyTestCase {
+    private static final Logger logger = LoggerFactory.getLogger(RepositoryEncryptorUtilsTest.class)
+
+    private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
+    private static final String KEY_HEX_256 = KEY_HEX_128 * 2
+    private static final String KEY_HEX_1 = isUnlimitedStrengthCryptoAvailable() ? KEY_HEX_256 : KEY_HEX_128
+
+    private static final String KEY_HEX_2 = "00" * (isUnlimitedStrengthCryptoAvailable() ? 32 : 16)
+    private static final String KEY_HEX_3 = "AA" * (isUnlimitedStrengthCryptoAvailable() ? 32 : 16)
+
+    private static final String KEY_ID_1 = "K1"
+    private static final String KEY_ID_2 = "K2"
+    private static final String KEY_ID_3 = "K3"
+
+    private static AESKeyedCipherProvider mockCipherProvider
+
+    private static String ORIGINAL_LOG_LEVEL
+
+    private NiFiProperties nifiProperties
+    private static final String LOG_PACKAGE = "org.slf4j.simpleLogger.log.org.apache.nifi.controller.repository.crypto"
+
+    private static final boolean isLossTolerant = false
+
+    // Mapping of key IDs to keys
+    final def KEYS = [
+            (KEY_ID_1): new SecretKeySpec(Hex.decode(KEY_HEX_1), "AES"),
+            (KEY_ID_2): new SecretKeySpec(Hex.decode(KEY_HEX_2), "AES"),
+            (KEY_ID_3): new SecretKeySpec(Hex.decode(KEY_HEX_3), "AES"),
+    ]
+    private static final String DEFAULT_NIFI_PROPS_PATH = "/conf/nifi.properties"
+
+    private static final Map<String, String> DEFAULT_ENCRYPTION_PROPS = [
+            (NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION)                              : "org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository",
+            (NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_ID)                           : KEY_ID_1,
+            (NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY)                              : KEY_HEX_1,
+            (NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS): StaticKeyProvider.class.name,
+            (NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION)            : ""
+    ]
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        ORIGINAL_LOG_LEVEL = System.getProperty(LOG_PACKAGE)
+        System.setProperty(LOG_PACKAGE, "DEBUG")
+
+        Security.addProvider(new BouncyCastleProvider())
+
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+
+        mockCipherProvider = [
+                getCipher: { EncryptionMethod em, SecretKey key, byte[] ivBytes, boolean encryptMode ->
+                    logger.mock("Getting cipher for ${em} with IV ${Hex.toHexString(ivBytes)} encrypt ${encryptMode}")
+                    Cipher cipher = Cipher.getInstance(em.algorithm)
+                    cipher.init((encryptMode ? Cipher.ENCRYPT_MODE : Cipher.DECRYPT_MODE) as int, key, new IvParameterSpec(ivBytes))
+                    cipher
+                }
+        ] as AESKeyedCipherProvider
+    }
+
+    @Before
+    void setUp() throws Exception {
+    }
+
+    @After
+    void tearDown() throws Exception {
+    }
+
+    @AfterClass
+    static void tearDownOnce() throws Exception {
+        if (ORIGINAL_LOG_LEVEL) {
+            System.setProperty(LOG_PACKAGE, ORIGINAL_LOG_LEVEL)
+        }
+    }
+
+    private static boolean isUnlimitedStrengthCryptoAvailable() {
+        Cipher.getMaxAllowedKeyLength("AES") > 128
+    }
+
+    @Test
+    void testShouldDetermineKeyProviderImplementationClassName() {
+        // Arrange
+        final Map EXPECTED_CLASS_NAMES = [
+                (RepositoryType.CONTENT)   : NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS,
+                (RepositoryType.FLOWFILE)  : NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS,
+                (RepositoryType.PROVENANCE): NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS,
+        ]
+
+        // Act
+        Map<RepositoryType, String> actualClassNames = RepositoryType.values().collectEntries { RepositoryType rt ->
+            [rt, RepositoryEncryptorUtils.determineKeyProviderImplementationClassName(rt)]
+        }
+
+        // Assert
+        actualClassNames.each { RepositoryType rt, String actualClassName ->
+            assert actualClassName == EXPECTED_CLASS_NAMES[rt]
+        }
+    }
+
+    @Test
+    void testDetermineKeyProviderImplementationClassNameShouldHandleUnsupportedRepositoryTypes() {
+        // Arrange
+
+        // Act
+        def actualClassName = RepositoryEncryptorUtils.determineKeyProviderImplementationClassName(null)
+
+        // Assert
+        assert actualClassName == "no_such_key_provider_defined"
+    }
+}
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java
index 1468d49..2a676e7 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/ByteArrayDataOutputStream.java
@@ -30,8 +30,8 @@ public class ByteArrayDataOutputStream {
     private final ByteArrayOutputStream baos;
     private final DataOutputStream dos;
 
-    public ByteArrayDataOutputStream(final int intiialBufferSize) {
-        this.baos = new ByteArrayOutputStream(intiialBufferSize);
+    public ByteArrayDataOutputStream(final int initialBufferSize) {
+        this.baos = new ByteArrayOutputStream(initialBufferSize);
         this.dos = new DataOutputStream(baos);
     }
 
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
index c54f505..b3fb267 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java
@@ -17,12 +17,6 @@
 
 package org.apache.nifi.wali;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.SerDeFactory;
-import org.wali.SyncListener;
-import org.wali.WriteAheadRepository;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
@@ -38,6 +32,11 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDeFactory;
+import org.wali.SyncListener;
+import org.wali.WriteAheadRepository;
 
 /**
  * <p>
@@ -64,7 +63,7 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
 
     private final File storageDirectory;
     private final File journalsDirectory;
-    private final SerDeFactory<T> serdeFactory;
+    protected final SerDeFactory<T> serdeFactory;
     private final SyncListener syncListener;
     private final Set<String> recoveredSwapLocations = new HashSet<>();
 
@@ -300,10 +299,10 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
             // that we could have an empty journal file already created. If this happens, we don't want to create
             // a new file on top of it because it would get deleted below when we clean up old journals. So we
             // will simply increment our transaction ID and try again.
-            File journalFile = new File(journalsDirectory, String.valueOf(nextTransactionId) + ".journal");
+            File journalFile = new File(journalsDirectory, nextTransactionId + ".journal");
             while (journalFile.exists()) {
                 nextTransactionId++;
-                journalFile = new File(journalsDirectory, String.valueOf(nextTransactionId) + ".journal");
+                journalFile = new File(journalsDirectory, nextTransactionId + ".journal");
             }
 
             journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, nextTransactionId);
@@ -325,7 +324,7 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
         final long totalNanos = System.nanoTime() - startNanos;
         final long millis = TimeUnit.NANOSECONDS.toMillis(totalNanos);
         logger.info("Checkpointed Write-Ahead Log with {} Records and {} Swap Files in {} milliseconds (Stop-the-world time = {} milliseconds), max Transaction ID {}",
-            new Object[] {snapshotCapture.getRecords().size(), snapshotCapture.getSwapLocations().size(), millis, stopTheWorldMillis, snapshotCapture.getMaxTransactionId()});
+                snapshotCapture.getRecords().size(), snapshotCapture.getSwapLocations().size(), millis, stopTheWorldMillis, snapshotCapture.getMaxTransactionId());
 
         return snapshotCapture.getRecords().size();
     }
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index fc07438..629eb3d 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -2512,7 +2512,7 @@ NOTE: Switching repository implementations should only be done on an instance wi
 
 === Write Ahead FlowFile Repository
 
-WriteAheadFlowFileRepository is the default implementation.  It persists FlowFiles to disk, and can optionally be configured to synchronize all changes to disk. This is very expensive and can significantly reduce NiFi performance. However, if it is `false`, there could be the potential for data loss if either there is a sudden power loss or the operating system crashes. The default value is `false`.
+`WriteAheadFlowFileRepository` is the default implementation.  It persists FlowFiles to disk, and can optionally be configured to synchronize all changes to disk. This is very expensive and can significantly reduce NiFi performance. However, if it is `false`, there could be the potential for data loss if either there is a sudden power loss or the operating system crashes. The default value is `false`.
 
 |====
 |*Property*|*Description*
@@ -2521,7 +2521,7 @@ Write-Ahead Log should be used. The default value is `org.apache.nifi.wali.Seque
 in order to address an issue that exists in the older implementation. In the event of power loss or an operating system crash, the old implementation was susceptible to recovering FlowFiles
 incorrectly. This could potentially lead to the wrong attributes or content being assigned to a FlowFile upon restart, following the power loss or OS crash. However, one can still choose to opt into
 using the previous implementation and accept that risk, if desired (for example, if the new implementation were to exhibit some unexpected error).
-To do so, set the value of this property to `org.wali.MinimalLockingWriteAheadLog`.
+To do so, set the value of this property to `org.wali.MinimalLockingWriteAheadLog`. Another available implementation is `org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog`.
 If the value of this property is changed, upon restart, NiFi will still recover the records written using the previously configured repository and delete the files written by the previously configured
 implementation.
 |`nifi.flowfile.repository.directory`*|The location of the FlowFile Repository. The default value is `./flowfile_repository`.
@@ -2530,6 +2530,32 @@ implementation.
 |`nifi.flowfile.repository.always.sync`|If set to `true`, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very expensive and can significantly reduce NiFi performance. However, if it is `false`, there could be the potential for data loss if either there is a sudden power loss or the operating system crashes. The default value is `false`.
 |====
 
+[[encrypted-write-ahead-flowfile-repository-properties]]
+=== Encrypted Write Ahead FlowFile Repository Properties
+
+All of the properties defined above (see <<write-ahead-flowfile-repository,Write Ahead FlowFile Repository>>) still apply. Only encryption-specific properties are listed here. See <<user-guide.adoc#encrypted-flowfile,Encrypted FlowFile Repository in the User Guide>> for more information.
+
+NOTE: Unlike the encrypted content and provenance repositories, the repository implementation does not change here, only the _underlying write-ahead log implementation_. This allows for cleaner separation and more flexibility in implementation selection. The property that should be changed to enable encryption is `nifi.flowfile.repository.wal.implementation`.
+
+|====
+|*Property*|*Description*
+|`nifi.flowfile.repository.encryption.key.provider.implementation`|This is the fully-qualified class name of the **key provider**. A key provider is the datastore interface for accessing the encryption key to protect the content claims. There are currently two implementations -- `StaticKeyProvider` which reads a key directly from _nifi.properties_, and `FileBasedKeyProvider` which reads *n* many keys from an encrypted file. The interface is extensible, and HSM-backed or other providers a [...]
+|`nifi.flowfile.repository.encryption.key.provider.location`|The path to the key definition resource (empty for `StaticKeyProvider`, `./keys.nkp` or similar path for `FileBasedKeyProvider`). For future providers like an HSM, this may be a connection string or URL.
+|`nifi.flowfile.repository.encryption.key.id`|The active key ID to use for encryption (e.g. `Key1`).
+|`nifi.flowfile.repository.encryption.key`|The key to use for `StaticKeyProvider`. The key format is hex-encoded (`0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210`) but can also be encrypted using the `./encrypt-config.sh` tool in NiFi Toolkit (see the <<toolkit-guide.adoc#encrypt_config_tool,Encrypt-Config Tool>> section in the link:toolkit-guide.html[NiFi Toolkit Guide] for more information).
+|`nifi.flowfile.repository.encryption.key.id.`*|Allows for additional keys to be specified for the `StaticKeyProvider`. For example, the line `nifi.flowfile.repository.encryption.key.id.Key2=012...210` would provide an available key `Key2`.
+|====
+
+The simplest configuration is below:
+
+....
+nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository
+nifi.flowfile.repository.wal.implementation=org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog
+nifi.flowfile.repository.encryption.key.provider.implementation=org.apache.nifi.security.kms.StaticKeyProvider
+nifi.flowfile.repository.encryption.key.provider.location=
+nifi.flowfile.repository.encryption.key.id=Key1
+nifi.flowfile.repository.encryption.key=0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210
+....
 
 === Volatile FlowFile Repository
 
@@ -2711,7 +2737,7 @@ The simplest configuration is below:
 nifi.content.repository.implementation=org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository
 nifi.content.repository.encryption.key.provider.implementation=org.apache.nifi.security.kms.StaticKeyProvider
 nifi.content.repository.encryption.key.provider.location=
-nifi.content.repository.encryption.key.id=K1
+nifi.content.repository.encryption.key.id=Key1
 nifi.content.repository.encryption.key=0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210
 ....
 
diff --git a/nifi-docs/src/main/asciidoc/images/encrypted-flowfile-hex.png b/nifi-docs/src/main/asciidoc/images/encrypted-flowfile-hex.png
new file mode 100644
index 0000000..71833d7
Binary files /dev/null and b/nifi-docs/src/main/asciidoc/images/encrypted-flowfile-hex.png differ
diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc b/nifi-docs/src/main/asciidoc/user-guide.adoc
index 396cc99..1bf6c8a 100644
--- a/nifi-docs/src/main/asciidoc/user-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/user-guide.adoc
@@ -2660,6 +2660,7 @@ key5=c6FzfnKm7UR7xqI2NFpZ+fEKBfSU7+1NvRw+XWQ9U39MONWqk5gvoyOCdFR1kUgeg46jrN5dGXk
 
 Each line defines a key ID and then the Base64-encoded cipher text of a 16 byte IV and wrapped AES-128, AES-192, or AES-256 key depending on the JCE policies available. The individual keys are wrapped by AES/GCM encryption using the **master key** defined by `nifi.bootstrap.sensitive.key` in _conf/bootstrap.conf_.
 
+[[provenance-repository-key-rotation]]
 ===== Key Rotation
 Simply update _nifi.properties_ to reference a new key ID in `nifi.provenance.repository.encryption.key.id`. Previously-encrypted events can still be decrypted as long as that key is still available in the key definition file or `nifi.provenance.repository.encryption.key.id.<OldKeyID>` as the key ID is serialized alongside the encrypted record.
 
@@ -2682,15 +2683,15 @@ When switching between implementation "families" (i.e. `VolatileProvenanceReposi
 
 * Switching between unencrypted and encrypted repositories
 ** If a user has an existing repository (`WriteAheadProvenanceRepository` only -- **not** `PersistentProvenanceRepository`) that is not encrypted and switches their configuration to use an encrypted repository, the application writes an error to the log but starts up. However, previous events are not accessible through the provenance query interface and new events will overwrite the existing events. The same behavior occurs if a user switches from an encrypted repository to an unencrypte [...]
-*** Encrypted -> unencrypted -- if the previous repository implementation was encrypted, these events should be handled seamlessly as long as the key provider available still has the keys used to encrypt the events (see **Key Rotation**)
+*** Encrypted -> unencrypted -- if the previous repository implementation was encrypted, these events should be handled seamlessly as long as the key provider available still has the keys used to encrypt the events (see <<provenance-repository-key-rotation,Key Rotation>>)
 *** Unencrypted -> encrypted -- if the previous repository implementation was unencrypted, these events should be handled seamlessly as the previously recorded events simply need to be read with a plaintext schema record reader and then written back with the encrypted record writer
 ** There is also a future effort to provide a standalone tool in NiFi Toolkit to encrypt/decrypt an existing provenance repository to make the transition easier. The translation process could take a long time depending on the size of the existing repository, and being able to perform this task outside of application startup would be valuable (link:https://issues.apache.org/jira/browse/NIFI-3723[NIFI-3723^]).
 * Multiple repositories -- No additional effort or testing has been applied to multiple repositories at this time. It is possible/likely issues will occur with repositories on different physical devices. There is no option to provide a heterogenous environment (i.e. one encrypted, one plaintext repository).
 * Corruption -- when a disk is filled or corrupted, there have been reported issues with the repository becoming corrupted and recovery steps are necessary. This is likely to continue to be an issue with the encrypted repository, although still limited in scope to individual records (i.e. an entire repository file won't be irrecoverable due to the encryption).
 
 [[encrypted-content]]
-=== Encrypted Content Repository
-While OS-level access control can offer some security over the flowfile content data written to the disk in a repository, there are scenarios where the data may be sensitive, compliance and regulatory requirements exist, or NiFi is running on hardware not under the direct control of the organization (cloud, etc.). In this case, the content repository allows for all data to be encrypted before being persisted to the disk.
+== Encrypted Content Repository
+While OS-level access control can offer some security over the flowfile content data written to the disk in a repository, there are scenarios where the data may be sensitive, compliance and regulatory requirements exist, or NiFi is running on hardware not under the direct control of the organization (cloud, etc.). In this case, the content repository allows for all data to be encrypted before being persisted to the disk. For more information on the internal workings of the content reposi [...]
 
 [WARNING]
 .Experimental
@@ -2704,17 +2705,17 @@ This implementation is marked <<experimental_warning, *experimental*>> as of Apa
 The current implementation of the encrypted content repository intercepts the serialization of content data via the `EncryptedContentRepositoryOutputStream` and uses the `AES/CTR` algorithm, which is fairly performant on commodity hardware. This use of a stream cipher (because the content is operated on in a streaming manner for performance) differs from the use of an authenticated encryption algorithm (AEAD) like `AES/GCM` in the <<encrypted-provenance,Encrypted Provenance Repository>>. [...]
 ============
 
-==== What is it?
+=== What is it?
 
 The `EncryptedFileSystemRepository` is a new implementation of the content repository which encrypts all content data before it is written to the repository. This allows for storage on systems where OS-level access controls are not sufficient to protect the data while still allowing querying and access to the data through the NiFi UI/API.
 
-==== How does it work?
+=== How does it work?
 
 The `FileSystemRepository` was introduced in NiFi 0.2.1 and provided the only persistent content repository implementation. The encrypted version wraps that implementation with functionality to return to the `Session` (usually `StandardProcessSession`) a special `OutputStream`/`InputStream` which encrypt and decrypt the serialized bytes respectively. This allows all components to continue interacting with the content repository interface in the same way as before and continue operating o [...]
 
 The fully qualified class `org.apache.nifi.content.EncryptedFileSystemRepository` is specified as the content repository implementation in _nifi.properties_ as the value of `nifi.content.repository.implementation`. In addition, <<administration-guide.adoc#encrypted-file-system-content-repository-properties,new properties>> must be populated to allow successful initialization.
 
-===== StaticKeyProvider
+==== StaticKeyProvider
 The `StaticKeyProvider` implementation defines keys directly in _nifi.properties_. Individual keys are provided in hexadecimal encoding. The keys can also be encrypted like any other sensitive property in _nifi.properties_ using the <<administration-guide.adoc#encrypt-config_tool,`./encrypt-config.sh`>> tool in the NiFi Toolkit.
 
 The following configuration section would result in a key provider with two available keys, "Key1" (active) and "AnotherKey".
@@ -2725,7 +2726,7 @@ nifi.content.repository.encryption.key=0123456789ABCDEFFEDCBA9876543210012345678
 nifi.content.repository.encryption.key.id.AnotherKey=0101010101010101010101010101010101010101010101010101010101010101
 ....
 
-===== FileBasedKeyProvider
+==== FileBasedKeyProvider
 The `FileBasedKeyProvider` implementation reads from an encrypted definition file of the format:
 
 ....
@@ -2745,10 +2746,11 @@ Tool_ or `FileBasedKeyProvider`, those _keys_ will be protected using `AES/GCM`
 over the key material.
 ****
 
-===== Key Rotation
+[[content-repository-key-rotation]]
+==== Key Rotation
 Simply update _nifi.properties_ to reference a new key ID in `nifi.content.repository.encryption.key.id`. Previously-encrypted content claims can still be decrypted as long as that key is still available in the key definition file or `nifi.content.repository.encryption.key.id.<OldKeyID>` as the key ID is serialized alongside the encrypted content.
 
-==== Writing and Reading Content Claims
+=== Writing and Reading Content Claims
 Once the repository is initialized, all content claim write operations are serialized using `RepositoryObjectStreamEncryptor` (the only currently existing implementation is `RepositoryObjectAESCTREncryptor`) to an `OutputStream`. The actual implementation is `EncryptedContentRepositoryOutputStream`, which encrypts the data written by the component via `StandardProcessSession` inline and the encryption metadata (`keyId`, `algorithm`, `version`, `IV`) is serialized and prepended. The compl [...]
 
 image:encrypted-content-hex.png["Encrypted content repository file on disk"]
@@ -2757,7 +2759,7 @@ On content claim read, the process is reversed. The encryption metadata (`Reposi
 
 Within the NiFi UI/API, there is no detectable difference between an encrypted and unencrypted content repository. The Provenance Query operations to view content work as expected with no change to the process.
 
-==== Potential Issues
+=== Potential Issues
 
 [WARNING]
 .Switching Implementations
@@ -2767,12 +2769,93 @@ When switching between implementation "families" (i.e. `VolatileContentRepositor
 
 * Switching between unencrypted and encrypted repositories
 ** If a user has an existing repository (`FileSystemRepository`) that is not encrypted and switches their configuration to use an encrypted repository, the application writes an error to the log but starts up. However, previous content claims are not accessible through the provenance query interface and new content claims will overwrite the existing claims. The same behavior occurs if a user switches from an encrypted repository to an unencrypted repository. Automatic roll-over is a futu [...]
-*** Encrypted -> unencrypted -- if the previous repository implementation was encrypted, these claims should be handled seamlessly as long as the key provider available still has the keys used to encrypt the claims (see **Key Rotation**)
+*** Encrypted -> unencrypted -- if the previous repository implementation was encrypted, these claims should be handled seamlessly as long as the key provider available still has the keys used to encrypt the claims (see <<content-repository-key-rotation,Key Rotation>>)
 *** Unencrypted -> encrypted -- if the previous repository implementation was unencrypted, these claims should be handled seamlessly as the previously written claims simply need to be read with a plaintext `InputStream` and then be written back with the `EncryptedContentRepositoryOutputStream`
 ** There is also a future effort to provide a standalone tool in NiFi Toolkit to encrypt/decrypt an existing content repository to make the transition easier. The translation process could take a long time depending on the size of the existing repository, and being able to perform this task outside of application startup would be valuable (link:https://issues.apache.org/jira/browse/NIFI-6783[NIFI-6783^]).
 * Multiple repositories -- No additional effort or testing has been applied to multiple repositories at this time. It is possible/likely issues will occur with repositories on different physical devices. There is no option to provide a heterogenous environment (i.e. one encrypted, one plaintext repository).
 * Corruption -- when a disk is filled or corrupted, there have been reported issues with the repository becoming corrupted and recovery steps are necessary. This is likely to continue to be an issue with the encrypted repository, although still limited in scope to individual claims (i.e. an entire repository file won't be irrecoverable due to the encryption). Some testing has been performed on scenarios where disk space is exhausted. While the flow can no longer write additional content  [...]
 
+[[encrypted-flowfile]]
+== Encrypted FlowFile Repository
+While OS-level access control can offer some security over the flowfile attribute and content claim data written to the disk in a repository, there are scenarios where the data may be sensitive, compliance and regulatory requirements exist, or NiFi is running on hardware not under the direct control of the organization (cloud, etc.). In this case, the flowfile repository allows for all data to be encrypted before being persisted to the disk. For more information on the internal workings  [...]
+
+[WARNING]
+.Experimental
+============
+This implementation is marked <<experimental_warning, *experimental*>> as of Apache NiFi 1.11.0 (January 2020). The API, configuration, and internal behavior may change without warning, and such changes may occur during a minor release. Use at your own risk.
+============
+
+[WARNING]
+.Performance
+============
+The current implementation of the encrypted flowfile repository intercepts the serialization of flowfile record data via the `EncryptedSchemaRepositoryRecordSerde` and uses the `AES/GCM` algorithm, which is fairly performant on commodity hardware. This use of an authenticated encryption algorithm (AEAD) block cipher (because the content length is limited and known a priori) is the same as the <<encrypted-provenance,Encrypted Provenance Repository>>, but differs from the unauthenticated s [...]
+============
+
+=== What is it?
+
+The `EncryptedSequentialAccessWriteAheadLog` is a new implementation of the flowfile write-ahead log which encrypts all flowfile attribute data before it is written to the repository. This allows for storage on systems where OS-level access controls are not sufficient to protect the data while still allowing querying and access to the data through the NiFi UI/API.
+
+=== How does it work?
+
+The `SequentialAccessWriteAheadLog` was introduced in NiFi 1.6.0 and provided a faster flowfile repository implementation. The encrypted version wraps that implementation with functionality to transparently encrypt and decrypt the serialized `RepositoryRecord` objects during file system interaction. During all writes to disk (swapping, snapshotting, journaling, and checkpointing), the flowfile containers are serialized to bytes based on a schema, and this serialized form is encrypted bef [...]
+
+The fully qualified class `org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog` is specified as the flowfile repository write-ahead log implementation in _nifi.properties_ as the value of `nifi.flowfile.repository.wal.implementation`. In addition, <<administration-guide.adoc#encrypted-write-ahead-flowfile-repository-properties,new properties>> must be populated to allow successful initialization.
+
+==== StaticKeyProvider
+The `StaticKeyProvider` implementation defines keys directly in _nifi.properties_. Individual keys are provided in hexadecimal encoding. The keys can also be encrypted like any other sensitive property in _nifi.properties_ using the <<administration-guide.adoc#encrypt-config_tool,`./encrypt-config.sh`>> tool in the NiFi Toolkit.
+
+The following configuration section would result in a key provider with two available keys, "Key1" (active) and "AnotherKey".
+....
+nifi.flowfile.repository.encryption.key.provider.implementation=org.apache.nifi.security.kms.StaticKeyProvider
+nifi.flowfile.repository.encryption.key.id=Key1
+nifi.flowfile.repository.encryption.key=0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210
+nifi.flowfile.repository.encryption.key.id.AnotherKey=0101010101010101010101010101010101010101010101010101010101010101
+....
+
+==== FileBasedKeyProvider
+The `FileBasedKeyProvider` implementation reads from an encrypted definition file of the format:
+
+....
+key1=NGCpDpxBZNN0DBodz0p1SDbTjC2FG5kp1pCmdUKJlxxtcMSo6GC4fMlTyy1mPeKOxzLut3DRX+51j6PCO5SznA==
+key2=GYxPbMMDbnraXs09eGJudAM5jTvVYp05XtImkAg4JY4rIbmHOiVUUI6OeOf7ZW+hH42jtPgNW9pSkkQ9HWY/vQ==
+key3=SFe11xuz7J89Y/IQ7YbJPOL0/YKZRFL/VUxJgEHxxlXpd/8ELA7wwN59K1KTr3BURCcFP5YGmwrSKfr4OE4Vlg==
+key4=kZprfcTSTH69UuOU3jMkZfrtiVR/eqWmmbdku3bQcUJ/+UToecNB5lzOVEMBChyEXppyXXC35Wa6GEXFK6PMKw==
+key5=c6FzfnKm7UR7xqI2NFpZ+fEKBfSU7+1NvRw+XWQ9U39MONWqk5gvoyOCdFR1kUgeg46jrN5dGXk13sRqE0GETQ==
+....
+
+Each line defines a key ID and then the Base64-encoded cipher text of a 16 byte IV and wrapped AES-128, AES-192, or AES-256 key depending on the JCE policies available. The individual keys are wrapped by AES/GCM encryption using the **master key** defined by `nifi.bootstrap.sensitive.key` in _conf/bootstrap.conf_.
+
+[[flowfile-repository-key-rotation]]
+==== Key Rotation
+Simply update _nifi.properties_ to reference a new key ID in `nifi.flowfile.repository.encryption.key.id`. Previously-encrypted flowfile records can still be decrypted as long as that key is still available in the key definition file or `nifi.flowfile.repository.encryption.key.id.<OldKeyID>` as the key ID is serialized alongside the encrypted record.
+
+=== Writing and Reading FlowFiles
+Once the repository is initialized, all flowfile record write operations are serialized using `RepositoryObjectBlockEncryptor` (the only currently existing implementation is `RepositoryObjectAESGCMEncryptor`) to the provided `DataOutputStream`. The original stream is swapped with a temporary wrapped stream, which encrypts the data written by the wrapped serializer/deserializer via `EncryptedSchemaRepositoryRecordSerde` inline and the encryption metadata (`keyId`, `algorithm`, `version`,  [...]
+
+image:encrypted-flowfile-hex.png["Encrypted flowfile repository journal file on disk"]
+
+On flowfile record read, the process is reversed. The encryption metadata (`RepositoryObjectEncryptionMetadata`) is parsed and used to decrypt the serialized bytes, which are then deserialized into a `DataInputStream` object.
+
+During swaps and recoveries, the flowfile records are deserialized and reserialized, so if the active key has been changed, the flowfile records will be re-encrypted with the new active key.
+
+Within the NiFi UI/API, there is no detectable difference between an encrypted and unencrypted flowfile repository. All framework interactions with flowfiles work as expected with no change to the process.
+
+=== Potential Issues
+
+[WARNING]
+.Switching Implementations
+============
+It is not recommended to switch between any implementation other than `SequentialAccessWriteAheadLog` and the `EncryptedSequentialAccessWriteAheadLog`. To migrate from a different provider, first migrate to the plaintext sequential log, allow NiFi to automatically recover the flowfiles, then stop NiFi and change the configuration to enable encryption. NiFi will automatically recover the plaintext flowfiles from the repository, and begin encrypting them on subsequent writes.
+============
+
+* Switching between unencrypted and encrypted repositories
+** If a user has an existing write-ahead repository (`WriteAheadFlowFileRepository`) that is not encrypted (uses the `SequentialAccessWriteAheadLog`) and switches their configuration to use an encrypted repository, the application handles this and all flowfile records will be recovered on startup. Future writes (including re-serialization of these same flowfiles) will be encrypted. If a user switches from an encrypted repository to an unencrypted repository, the flowfiles cannot be recov [...]
+*** Encrypted -> unencrypted -- if the previous repository implementation was encrypted, these records should be handled seamlessly as long as the key provider available still has the keys used to encrypt the claims (see <<flowfile-repository-key-rotation,Key Rotation>>)
+*** Unencrypted -> encrypted -- currently handled seamlesssly for `SequentialAccessWriteAheadLog` but there are other initial implementations which could be handled
+** There is also a future effort to provide a standalone tool in NiFi Toolkit to encrypt/decrypt an existing flowfile repository to make the transition easier. The translation process could take a long time depending on the size of the existing repository, and being able to perform this task outside of application startup would be valuable (link:https://issues.apache.org/jira/browse/NIFI-6994[NIFI-6994^]).
+* Multiple repositories -- No additional effort or testing has been applied to multiple repositories at this time. Current implementations of the flowfile repository allow only for one repository, though it can reside across multiple volumes and partitions. It is possible/likely issues will occur with repositories on different physical devices. There is no option to provide a heterogenous environment (i.e. one encrypted, one plaintext partition/directory).
+* Corruption -- when a disk is filled or corrupted, there have been reported issues with the repository becoming corrupted and recovery steps are necessary. This is likely to continue to be an issue with the encrypted repository, although still limited in scope to individual records (i.e. an entire repository file won't be irrecoverable due to the encryption). It is important for the continued operation of NiFi to ensure that the disk storing the flowfile repository does not run out of a [...]
+
 
 [[experimental_warning]]
 == Experimental Warning
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml
index d9581e2..09aff50 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml
@@ -42,5 +42,28 @@
             <artifactId>nifi-schema-utils</artifactId>
             <version>1.11.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15on</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy-dateutil</artifactId>
+            <version>${nifi.groovy.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties-loader</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactory.java
new file mode 100644
index 0000000..0cc2d72
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.IOException;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.security.kms.CryptoUtils;
+import org.apache.nifi.security.kms.EncryptionException;
+import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+
+public class EncryptedRepositoryRecordSerdeFactory extends StandardRepositoryRecordSerdeFactory {
+    private static final Logger logger = LoggerFactory.getLogger(EncryptedRepositoryRecordSerdeFactory.class);
+
+    private FlowFileRepositoryEncryptionConfiguration ffrec;
+
+    public EncryptedRepositoryRecordSerdeFactory(final ResourceClaimManager claimManager, NiFiProperties niFiProperties) throws EncryptionException {
+        super(claimManager);
+
+        // Retrieve encryption configuration
+        FlowFileRepositoryEncryptionConfiguration ffrec = new FlowFileRepositoryEncryptionConfiguration(niFiProperties);
+
+        // The configuration should be validated immediately rather than waiting until attempting to deserialize records (initial recovery at startup)
+        if (!CryptoUtils.isValidRepositoryEncryptionConfiguration(ffrec)) {
+            logger.error("The flowfile repository encryption configuration is not valid (see above). Shutting down...");
+            throw new EncryptionException("The flowfile repository encryption configuration is not valid");
+        }
+
+        this.ffrec = ffrec;
+    }
+
+    @Override
+    public SerDe<RepositoryRecord> createSerDe(String encodingName) {
+        // If no encoding is provided, use the encrypted as the default
+        if (encodingName == null || EncryptedSchemaRepositoryRecordSerde.class.getName().equals(encodingName)) {
+            // Delegate the creation of the wrapped serde to the standard factory
+            final SerDe<RepositoryRecord> serde = super.createSerDe(null);
+
+            // Retrieve encryption configuration
+            try {
+                return new EncryptedSchemaRepositoryRecordSerde(serde, ffrec);
+            } catch (IOException e) {
+                throw new IllegalArgumentException("Could not create Deserializer for Repository Records because the encoding " + encodingName + " requires NiFi properties which could not be loaded");
+            }
+        }
+
+        // If not encrypted, delegate to the standard factory
+        return super.createSerDe(encodingName);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedSchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedSchemaRepositoryRecordSerde.java
new file mode 100644
index 0000000..f6daa4b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedSchemaRepositoryRecordSerde.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.security.kms.EncryptionException;
+import org.apache.nifi.security.kms.KeyProvider;
+import org.apache.nifi.security.repository.RepositoryEncryptorUtils;
+import org.apache.nifi.security.repository.block.RepositoryObjectBlockEncryptor;
+import org.apache.nifi.security.repository.block.aes.RepositoryObjectAESGCMEncryptor;
+import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+/**
+ * This class is an implementation of the {@link SerDe} interface which provides transparent
+ * encryption/decryption of flowfile record data during file system interaction. As of Apache NiFi 1.11.0
+ * (January 2020), this implementation is considered <a href="https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#experimental-warning">*experimental*</a>. For further details, review the
+ * <a href="https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#encrypted-flowfile">Apache NiFi User Guide -
+ * Encrypted FlowFile Repository</a> and
+ * <a href="https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#encrypted-flowfile-repository-properties">Apache NiFi Admin Guide - Encrypted FlowFile
+ * Repository Properties</a>.
+ */
+public class EncryptedSchemaRepositoryRecordSerde implements SerDe<RepositoryRecord> {
+    private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRepositoryRecordSerde.class);
+    private final SerDe<RepositoryRecord> wrappedSerDe;
+    private final KeyProvider keyProvider;
+    private String activeKeyId;
+
+    /**
+     * Creates an instance of the serializer/deserializer which wraps another SerDe instance but transparently encrypts/decrypts the data before/after writing/reading from the streams.
+     *
+     * @param wrappedSerDe                              the wrapped SerDe instance which performs the object <-> bytes (de)serialization
+     * @param flowFileRepositoryEncryptionConfiguration the configuration values necessary to encrypt/decrypt the data
+     * @throws IOException if there is a problem retrieving the configuration values
+     */
+    public EncryptedSchemaRepositoryRecordSerde(final SerDe<RepositoryRecord> wrappedSerDe, final FlowFileRepositoryEncryptionConfiguration
+            flowFileRepositoryEncryptionConfiguration) throws IOException {
+        if (wrappedSerDe == null) {
+            throw new IllegalArgumentException("This implementation must be provided another serde instance to function");
+        }
+        this.wrappedSerDe = wrappedSerDe;
+
+        // Initialize the encryption-specific fields
+        this.keyProvider = RepositoryEncryptorUtils.validateAndBuildRepositoryKeyProvider(flowFileRepositoryEncryptionConfiguration);
+
+        // Set active key ID
+        setActiveKeyId(flowFileRepositoryEncryptionConfiguration.getEncryptionKeyId());
+    }
+
+    /**
+     * Creates an instance of the serializer/deserializer which wraps another SerDe instance but transparently encrypts/decrypts the data before/after writing/reading from the streams.
+     *
+     * @param wrappedSerDe   the wrapped SerDe instance which performs the object <-> bytes (de)serialization
+     * @param niFiProperties the configuration values necessary to encrypt/decrypt the data
+     * @throws IOException if there is a problem retrieving the configuration values
+     */
+    public EncryptedSchemaRepositoryRecordSerde(final SerDe<RepositoryRecord> wrappedSerDe, final NiFiProperties niFiProperties) throws IOException {
+        this(wrappedSerDe, new FlowFileRepositoryEncryptionConfiguration(niFiProperties));
+    }
+
+    /**
+     * Returns the active key ID used for encryption.
+     *
+     * @return the active key ID
+     */
+    String getActiveKeyId() {
+        return activeKeyId;
+    }
+
+    /**
+     * Sets the active key ID used for encryption.
+     *
+     * @param activeKeyId the key ID to use
+     */
+    public void setActiveKeyId(String activeKeyId) {
+        // Key must not be blank and key provider must make key available
+        if (StringUtils.isNotBlank(activeKeyId) && keyProvider.keyExists(activeKeyId)) {
+            this.activeKeyId = activeKeyId;
+            logger.debug("Set active key ID to '" + activeKeyId + "'");
+        } else {
+            logger.warn("Attempted to set active key ID to '" + activeKeyId + "' but that is not a valid or available key ID. Keeping active key ID as '" + this.activeKeyId + "'");
+        }
+    }
+
+    @Override
+    public void writeHeader(final DataOutputStream out) throws IOException {
+        wrappedSerDe.writeHeader(out);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Wrote schema header ({} bytes) to output stream", out.size());
+        }
+    }
+
+    @Override
+    public void readHeader(final DataInputStream in) throws IOException {
+        wrappedSerDe.readHeader(in);
+    }
+
+    /**
+     * <p>
+     * Serializes an Edit Record to the log via the given
+     * {@link DataOutputStream}.
+     * </p>
+     *
+     * @param previousRecordState previous state
+     * @param newRecordState      new state
+     * @param out                 stream to write to
+     * @throws IOException if fail during write
+     * @deprecated it is not beneficial to serialize the deltas, so this method just passes through to
+     * {@link #serializeRecord(RepositoryRecord, DataOutputStream)}. It is preferable to use that method directly.
+     */
+    @Deprecated
+    @Override
+    public void serializeEdit(RepositoryRecord previousRecordState, RepositoryRecord newRecordState, DataOutputStream out) throws IOException {
+        serializeRecord(newRecordState, out);
+    }
+
+    /**
+     * Serializes the provided {@link RepositoryRecord} to the provided stream in an encrypted format.
+     *
+     * @param record the record to encrypt and serialize
+     * @param out    the output stream to write to
+     * @throws IOException if there is a problem writing to the stream
+     */
+    @Override
+    public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException {
+        // Create BAOS wrapped in DOS to intercept the output
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        DataOutputStream tempDataStream = new DataOutputStream(byteArrayOutputStream);
+
+        // Use the delegate to serialize the actual record and extract the output
+        String recordId = getRecordIdentifier(record).toString();
+        wrappedSerDe.serializeRecord(record, tempDataStream);
+        tempDataStream.flush();
+        byte[] plainSerializedBytes = byteArrayOutputStream.toByteArray();
+        logger.debug("Serialized flowfile record {} to temp stream with length {}", recordId, plainSerializedBytes.length);
+
+        // Encrypt the serialized byte[] to the real stream
+        encryptToStream(plainSerializedBytes, recordId, out);
+        logger.debug("Encrypted serialized flowfile record {} to actual output stream", recordId);
+    }
+
+    /**
+     * Encrypts the plain serialized bytes and writes them to the output stream. Precedes
+     * the cipher bytes with the plaintext
+     * {@link org.apache.nifi.security.repository.RepositoryObjectEncryptionMetadata} data
+     * to allow for on-demand deserialization and decryption.
+     *
+     * @param plainSerializedBytes the plain serialized bytes
+     * @param recordId             the unique identifier for this record to be stored in the encryption metadata
+     * @param out                  the output stream
+     * @throws IOException if there is a problem writing to the stream
+     */
+    private void encryptToStream(byte[] plainSerializedBytes, String recordId, DataOutputStream out) throws IOException {
+        try {
+            RepositoryObjectBlockEncryptor encryptor = new RepositoryObjectAESGCMEncryptor();
+            encryptor.initialize(keyProvider);
+            logger.debug("Initialized {} for flowfile record {}", encryptor.toString(), recordId);
+
+            byte[] cipherBytes = encryptor.encrypt(plainSerializedBytes, recordId, getActiveKeyId());
+            logger.debug("Encrypted {} bytes for flowfile record {}", cipherBytes.length, recordId);
+
+            // Maybe remove cipher bytes length because it's included in encryption metadata; deserialization might need to change?
+            out.writeInt(cipherBytes.length);
+            out.write(cipherBytes);
+            logger.debug("Wrote {} bytes (encrypted, including length) for flowfile record {} to output stream", cipherBytes.length + 4, recordId);
+        } catch (KeyManagementException | EncryptionException e) {
+            logger.error("Encountered an error encrypting & serializing flowfile record {} due to {}", recordId, e.getLocalizedMessage());
+            if (logger.isDebugEnabled()) {
+                logger.debug(e.getLocalizedMessage(), e);
+            }
+
+            // The calling method contract only throws IOException
+            throw new IOException("Encountered an error encrypting & serializing flowfile record " + recordId, e);
+        }
+    }
+
+    /**
+     * <p>
+     * Reads an Edit Record from the given {@link DataInputStream} and merges
+     * that edit with the current version of the record, returning the new,
+     * merged version. If the Edit Record indicates that the entity was deleted,
+     * must return a Record with an UpdateType of {@link UpdateType#DELETE}.
+     * This method must never return <code>null</code>.
+     * </p>
+     *
+     * @param in                  to deserialize from
+     * @param currentRecordStates an unmodifiable map of Record ID's to the
+     *                            current state of that record
+     * @param version             the version of the SerDe that was used to serialize the
+     *                            edit record
+     * @return deserialized record
+     * @throws IOException if failure reading
+     * @deprecated it is not beneficial to serialize the deltas, so this method throws a {@link EOFException}. It is preferable to use {@link #deserializeRecord(DataInputStream, int)}.
+     */
+    @Deprecated
+    @Override
+    public RepositoryRecord deserializeEdit(DataInputStream in, Map<Object, RepositoryRecord> currentRecordStates, int version) throws IOException {
+        return deserializeRecord(in, version);
+
+        // deserializeRecord may return a null if there is no more data. However, when we are deserializing
+        // an edit, we do so only when we know that we should have data. This is why the JavaDocs for this method
+        // on the interface indicate that this method should never return null. As a result, if there is no data
+        // available, we handle this by throwing an EOFException.
+        // throw new EOFException();
+    }
+
+    /**
+     * Returns the deserialized and decrypted {@link RepositoryRecord} from the input stream.
+     *
+     * @param in      stream to read from
+     * @param version the version of the SerDe that was used to serialize the
+     *                record
+     * @return the deserialized record
+     * @throws IOException if there is a problem reading from the stream
+     */
+    @Override
+    public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
+        // Read the expected length of the encrypted record (including the encryption metadata)
+        int encryptedRecordLength = in.readInt();
+        if (encryptedRecordLength == -1) {
+            return null;
+        }
+
+        // Read the encrypted record bytes
+        byte[] cipherBytes = new byte[encryptedRecordLength];
+        StreamUtils.fillBuffer(in, cipherBytes);
+        logger.debug("Read {} bytes (encrypted, including length) from actual input stream", encryptedRecordLength + 4);
+
+        // Decrypt the byte[]
+        DataInputStream wrappedInputStream = decryptToStream(cipherBytes);
+
+        // Deserialize the plain bytes using the delegate serde
+        final RepositoryRecord deserializedRecord = wrappedSerDe.deserializeRecord(wrappedInputStream, version);
+        logger.debug("Deserialized flowfile record {} from temp stream", getRecordIdentifier(deserializedRecord));
+        return deserializedRecord;
+    }
+
+    /**
+     * Returns a {@link DataInputStream} containing the plaintext bytes so the record can be properly deserialized by the wrapped serde.
+     *
+     * @param cipherBytes the serialized, encrypted bytes
+     * @return a stream wrapping the plain bytes
+     */
+    private DataInputStream decryptToStream(byte[] cipherBytes) throws IOException {
+        try {
+            RepositoryObjectBlockEncryptor encryptor = new RepositoryObjectAESGCMEncryptor();
+            encryptor.initialize(keyProvider);
+            logger.debug("Initialized {} for decrypting flowfile record", encryptor.toString());
+
+            byte[] plainSerializedBytes = encryptor.decrypt(cipherBytes, "[pending record ID]");
+            logger.debug("Decrypted {} bytes for flowfile record", cipherBytes.length);
+
+            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(plainSerializedBytes);
+            return new DataInputStream(byteArrayInputStream);
+        } catch (KeyManagementException | EncryptionException e) {
+            logger.error("Encountered an error decrypting & deserializing flowfile record due to {}", e.getLocalizedMessage());
+            if (logger.isDebugEnabled()) {
+                logger.debug(e.getLocalizedMessage(), e);
+            }
+
+            // The calling method contract only throws IOException
+            throw new IOException("Encountered an error decrypting & deserializing flowfile record", e);
+        }
+    }
+
+    /**
+     * Returns the unique ID for the given record.
+     *
+     * @param record to obtain identifier for
+     * @return identifier of record
+     */
+    @Override
+    public Object getRecordIdentifier(RepositoryRecord record) {
+        return wrappedSerDe.getRecordIdentifier(record);
+    }
+
+    /**
+     * Returns the UpdateType for the given record.
+     *
+     * @param record to retrieve update type for
+     * @return update type
+     */
+    @Override
+    public UpdateType getUpdateType(RepositoryRecord record) {
+        return wrappedSerDe.getUpdateType(record);
+    }
+
+    /**
+     * Returns the external location of the given record; this is used when a
+     * record is moved away from WALI or is being re-introduced to WALI. For
+     * example, WALI can be updated with a record of type
+     * {@link UpdateType#SWAP_OUT} that indicates a Location of
+     * file://tmp/external1 and can then be re-introduced to WALI by updating
+     * WALI with a record of type {@link UpdateType#CREATE} that indicates a
+     * Location of file://tmp/external1
+     *
+     * @param record to get location of
+     * @return location
+     */
+    @Override
+    public String getLocation(RepositoryRecord record) {
+        return wrappedSerDe.getLocation(record);
+    }
+
+    /**
+     * Returns the version that this SerDe will use when writing. This used used
+     * when serializing/deserializing the edit logs so that if the version
+     * changes, we are still able to deserialize old versions
+     *
+     * @return version
+     */
+    @Override
+    public int getVersion() {
+        return wrappedSerDe.getVersion();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java
index f24ac08..549ba87 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java
@@ -17,13 +17,12 @@
 
 package org.apache.nifi.controller.repository;
 
+import java.util.Map;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.wali.SerDe;
 import org.wali.UpdateType;
 
-import java.util.Map;
-
 public class StandardRepositoryRecordSerdeFactory implements RepositoryRecordSerdeFactory {
     private final String LEGACY_SERDE_ENCODING_NAME = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde";
     private final ResourceClaimManager resourceClaimManager;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
index e8ce44e..4155c44 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
@@ -23,9 +23,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
@@ -161,7 +161,7 @@ public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde imple
             }
 
             final FlowFileRecord flowFileRecord = ffBuilder.build();
-            final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+            final StandardRepositoryRecord record = new StandardRepositoryRecord(null, flowFileRecord);
             record.markForDelete();
 
             return record;
@@ -283,7 +283,7 @@ public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde imple
             }
 
             final FlowFileRecord flowFileRecord = ffBuilder.build();
-            final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+            final StandardRepositoryRecord record = new StandardRepositoryRecord(null, flowFileRecord);
             record.markForDelete();
             return record;
         }
@@ -448,7 +448,7 @@ public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde imple
     }
 
     private void writeString(final String toWrite, final OutputStream out) throws IOException {
-        final byte[] bytes = toWrite.getBytes("UTF-8");
+        final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
         final int utflen = bytes.length;
 
         if (utflen < 65535) {
@@ -473,7 +473,7 @@ public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde imple
         }
         final byte[] bytes = new byte[numBytes];
         fillBuffer(in, bytes, numBytes);
-        return new String(bytes, "UTF-8");
+        return new String(bytes, StandardCharsets.UTF_8);
     }
 
     private Integer readFieldLength(final InputStream in) throws IOException {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/groovy/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactoryTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/groovy/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactoryTest.groovy
new file mode 100644
index 0000000..5a504dd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/groovy/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactoryTest.groovy
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository
+
+
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager
+import org.apache.nifi.properties.StandardNiFiProperties
+import org.apache.nifi.security.kms.EncryptionException
+import org.apache.nifi.util.NiFiProperties
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.junit.After
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Rule
+import org.junit.Test
+import org.junit.rules.TestName
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import org.wali.SerDe
+
+import java.security.Security
+
+import static org.apache.nifi.security.kms.CryptoUtils.STATIC_KEY_PROVIDER_CLASS_NAME
+
+@RunWith(JUnit4.class)
+class EncryptedRepositoryRecordSerdeFactoryTest extends GroovyTestCase {
+    private static final Logger logger = LoggerFactory.getLogger(EncryptedRepositoryRecordSerdeFactoryTest.class)
+
+    private ResourceClaimManager claimManager
+
+    private static final String KEY_ID = "K1"
+    private static final String KEY_1_HEX = "0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210"
+
+    private NiFiProperties mockNiFiProperties
+
+    @Rule
+    public TestName testName = new TestName()
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        Security.addProvider(new BouncyCastleProvider())
+
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+    }
+
+    @Before
+    void setUp() throws Exception {
+        claimManager = new StandardResourceClaimManager()
+
+        Map flowfileEncryptionProps = [
+                (NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS): STATIC_KEY_PROVIDER_CLASS_NAME,
+                (NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY)                              : KEY_1_HEX,
+                (NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID)                           : KEY_ID
+        ]
+        mockNiFiProperties = new StandardNiFiProperties(new Properties(flowfileEncryptionProps))
+    }
+
+    @After
+    void tearDown() throws Exception {
+        claimManager.purge()
+    }
+
+    @Test
+    void testShouldCreateEncryptedSerde() {
+        // Arrange
+        EncryptedRepositoryRecordSerdeFactory factory = new EncryptedRepositoryRecordSerdeFactory(claimManager, mockNiFiProperties)
+
+        // Act
+        SerDe<RepositoryRecord> serde = factory.createSerDe(EncryptedSchemaRepositoryRecordSerde.class.name)
+        logger.info("Created serde: ${serde} ")
+
+        // Assert
+        assert serde instanceof EncryptedSchemaRepositoryRecordSerde
+    }
+
+    @Test
+    void testShouldCreateEncryptedSerdeForNullEncoding() {
+        // Arrange
+        EncryptedRepositoryRecordSerdeFactory factory = new EncryptedRepositoryRecordSerdeFactory(claimManager, mockNiFiProperties)
+
+        // Act
+        SerDe<RepositoryRecord> serde = factory.createSerDe(null)
+        logger.info("Created serde: ${serde} ")
+
+        // Assert
+        assert serde instanceof EncryptedSchemaRepositoryRecordSerde
+    }
+
+    @Test
+    void testShouldCreateStandardSerdeForStandardEncoding() {
+        // Arrange
+        EncryptedRepositoryRecordSerdeFactory factory = new EncryptedRepositoryRecordSerdeFactory(claimManager, mockNiFiProperties)
+
+        // Act
+        SerDe<RepositoryRecord> serde = factory.createSerDe(SchemaRepositoryRecordSerde.class.name)
+        logger.info("Created serde: ${serde} ")
+
+        // Assert
+        assert serde instanceof SchemaRepositoryRecordSerde
+    }
+
+    @Test
+    void testCreateSerDeShouldFailWithUnpopulatedNiFiProperties() {
+        // Arrange
+        NiFiProperties emptyNiFiProperties = new StandardNiFiProperties(new Properties([:]))
+
+        // Act
+        def msg = shouldFail(EncryptionException) {
+            EncryptedRepositoryRecordSerdeFactory factory = new EncryptedRepositoryRecordSerdeFactory(claimManager, emptyNiFiProperties)
+        }
+        logger.expected(msg)
+
+        // Assert
+        assert msg =~ "The flowfile repository encryption configuration is not valid"
+    }
+
+    @Test
+    void testConstructorShouldFailWithInvalidNiFiProperties() {
+        // Arrange
+        Map invalidFlowfileEncryptionProps = [
+                (NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS): STATIC_KEY_PROVIDER_CLASS_NAME.reverse(),
+                (NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY)                              : KEY_1_HEX,
+                (NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID)                           : KEY_ID
+        ]
+        NiFiProperties invalidNiFiProperties = new StandardNiFiProperties(new Properties(invalidFlowfileEncryptionProps))
+
+        // Act
+        def msg = shouldFail(EncryptionException) {
+            EncryptedRepositoryRecordSerdeFactory factory = new EncryptedRepositoryRecordSerdeFactory(claimManager, invalidNiFiProperties)
+        }
+        logger.expected(msg)
+
+        // Assert
+        assert msg =~ "The flowfile repository encryption configuration is not valid"
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/groovy/org/apache/nifi/controller/repository/EncryptedSchemaRepositoryRecordSerdeTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/groovy/org/apache/nifi/controller/repository/EncryptedSchemaRepositoryRecordSerdeTest.groovy
new file mode 100644
index 0000000..fcf3c98
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/groovy/org/apache/nifi/controller/repository/EncryptedSchemaRepositoryRecordSerdeTest.groovy
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository
+
+import org.apache.nifi.controller.queue.FlowFileQueue
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager
+import org.apache.nifi.security.kms.CryptoUtils
+import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.bouncycastle.util.encoders.Hex
+import org.junit.After
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Rule
+import org.junit.Test
+import org.junit.rules.TestName
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import org.wali.SerDe
+
+import java.security.Security
+
+import static org.apache.nifi.security.kms.CryptoUtils.STATIC_KEY_PROVIDER_CLASS_NAME
+
+@RunWith(JUnit4.class)
+class EncryptedSchemaRepositoryRecordSerdeTest extends GroovyTestCase {
+    private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRepositoryRecordSerdeTest.class)
+
+    public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier"
+
+    private ResourceClaimManager claimManager
+    private Map<String, FlowFileQueue> queueMap
+    private FlowFileQueue flowFileQueue
+    private ByteArrayOutputStream byteArrayOutputStream
+    private DataOutputStream dataOutputStream
+
+    // TODO: Mock the wrapped serde
+    // TODO: Make integration test with real wrapped serde
+    private SerDe<RepositoryRecord> wrappedSerDe
+
+    private static final String KPI = STATIC_KEY_PROVIDER_CLASS_NAME
+    private static final String KPL = ""
+    private static final String KEY_ID = "K1"
+    private static final Map<String, String> KEYS = [K1: "0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210"]
+    // TODO: Change to WAL impl name
+    private static final String REPO_IMPL = CryptoUtils.EWAFFR_CLASS_NAME
+
+    private FlowFileRepositoryEncryptionConfiguration flowFileREC
+
+    private EncryptedSchemaRepositoryRecordSerde esrrs
+
+    @Rule
+    public TestName testName = new TestName()
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        Security.addProvider(new BouncyCastleProvider())
+
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+    }
+
+    @Before
+    void setUp() throws Exception {
+        claimManager = new StandardResourceClaimManager()
+        queueMap = [:]
+        flowFileQueue = createAndRegisterMockQueue(TEST_QUEUE_IDENTIFIER)
+        byteArrayOutputStream = new ByteArrayOutputStream()
+        dataOutputStream = new DataOutputStream(byteArrayOutputStream)
+        wrappedSerDe = new SchemaRepositoryRecordSerde(claimManager)
+        wrappedSerDe.setQueueMap(queueMap)
+
+        flowFileREC = new FlowFileRepositoryEncryptionConfiguration(KPI, KPL, KEY_ID, KEYS, REPO_IMPL)
+
+        esrrs = new EncryptedSchemaRepositoryRecordSerde(wrappedSerDe, flowFileREC)
+    }
+
+    @After
+    void tearDown() throws Exception {
+        claimManager.purge()
+        queueMap.clear()
+    }
+
+    private FlowFileQueue createMockQueue(String identifier = testName.methodName + new Date().toString()) {
+        [getIdentifier: { ->
+            logger.mock("Retrieving flowfile queue identifier: ${identifier}")
+            identifier
+        }] as FlowFileQueue
+    }
+
+    private FlowFileQueue createAndRegisterMockQueue(String identifier = testName.methodName + new Date().toString()) {
+        FlowFileQueue queue = createMockQueue(identifier)
+        queueMap.put(identifier, queue)
+        queue
+    }
+
+    private RepositoryRecord buildCreateRecord(FlowFileQueue queue, Map<String, String> attributes = [:]) {
+        StandardRepositoryRecord record = new StandardRepositoryRecord(queue)
+        StandardFlowFileRecord.Builder ffrb = new StandardFlowFileRecord.Builder().id(System.nanoTime())
+        ffrb.addAttributes([uuid: getMockUUID()] + attributes as Map<String, String>)
+        record.setWorking(ffrb.build())
+        record
+    }
+
+    private String getMockUUID() {
+        "${testName.methodName ?: "no_test"}@${new Date().format("mmssSSS")}" as String
+    }
+
+    /** This test ensures that the creation of a flowfile record is applied to the specified output stream correctly */
+    @Test
+    void testShouldSerializeAndDeserializeRecord() {
+        // Arrange
+        RepositoryRecord newRecord = buildCreateRecord(flowFileQueue, [id: "1", firstName: "Andy", lastName: "LoPresto"])
+        DataOutputStream dos = dataOutputStream
+
+        esrrs.writeHeader(dataOutputStream)
+
+        // Act
+        esrrs.serializeRecord(newRecord, dos)
+        logger.info("Output stream: ${Hex.toHexString(byteArrayOutputStream.toByteArray())} ")
+
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))
+        esrrs.readHeader(dis)
+        RepositoryRecord deserializedRecord = esrrs.deserializeRecord(dis, 2)
+
+        /* The records will not be identical, because the process of serializing/deserializing changes the application
+         * of the delta data. The CREATE with a "current" record containing attributes becomes an UPDATE with an
+         * "original" record containing attributes */
+
+        // Assert
+        logger.info("    Original record: ${newRecord.dump()}")
+        logger.info("Deserialized record: ${deserializedRecord.dump()}")
+        assert newRecord.type == RepositoryRecordType.CREATE
+        assert deserializedRecord.type == RepositoryRecordType.UPDATE
+        assert deserializedRecord.originalAttributes == newRecord.current.attributes
+    }
+
+    /** This test ensures that the creation of a flowfile record is applied to the specified output stream correctly */
+    @Test
+    void testShouldSerializeAndDeserializeEdit() {
+        // Arrange
+        RepositoryRecord newRecord = buildCreateRecord(flowFileQueue, [id: "1", firstName: "Andy", lastName: "LoPresto"])
+        DataOutputStream dos = dataOutputStream
+
+        esrrs.writeHeader(dataOutputStream)
+
+        // Act
+        esrrs.serializeEdit(null, newRecord, dos)
+        logger.info("Output stream: ${Hex.toHexString(byteArrayOutputStream.toByteArray())} ")
+
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))
+        esrrs.readHeader(dis)
+        RepositoryRecord deserializedRecord = esrrs.deserializeEdit(dis, [:], 2)
+
+        /* The records will not be identical, because the process of serializing/deserializing changes the application
+         * of the delta data. The CREATE with a "current" record containing attributes becomes an UPDATE with an
+         * "original" record containing attributes */
+
+        // Assert
+        logger.info("    Original record: ${newRecord.dump()}")
+        logger.info("Deserialized record: ${deserializedRecord.dump()}")
+        assert newRecord.type == RepositoryRecordType.CREATE
+        assert deserializedRecord.type == RepositoryRecordType.UPDATE
+        assert deserializedRecord.originalAttributes == newRecord.current.attributes
+    }
+
+    /** This test ensures that the creation of a flowfile record is applied to the specified output stream correctly with encryption */
+    @Test
+    void testShouldEncryptOutput() {
+        // Arrange
+        RepositoryRecord newRecord = buildCreateRecord(flowFileQueue, [id: "1", firstName: "Andy", lastName: "LoPresto"])
+        DataOutputStream dos = dataOutputStream
+
+        esrrs.writeHeader(dataOutputStream)
+
+        // Act
+        esrrs.serializeRecord(newRecord, dos)
+        byte[] serializedBytes = byteArrayOutputStream.toByteArray()
+        def hexOutput = Hex.toHexString(serializedBytes)
+        logger.info("Output stream (${serializedBytes.length} bytes): ${hexOutput} ")
+
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes))
+        esrrs.readHeader(dis)
+        RepositoryRecord deserializedRecord = esrrs.deserializeRecord(dis, 2)
+
+        /* The records will not be identical, because the process of serializing/deserializing changes the application
+         * of the delta data. The CREATE with a "current" record containing attributes becomes an UPDATE with an
+         * "original" record containing attributes */
+
+        // Assert
+        logger.info("    Original record: ${newRecord.dump()}")
+        logger.info("Deserialized record: ${deserializedRecord.dump()}")
+        assert newRecord.type == RepositoryRecordType.CREATE
+        assert deserializedRecord.type == RepositoryRecordType.UPDATE
+        assert deserializedRecord.originalAttributes == newRecord.current.attributes
+
+        // Ensure the value is not present in plaintext
+        assert !hexOutput.contains(Hex.toHexString("Andy".bytes))
+
+        // Ensure the value is not present in "simple" encryption (reversing bytes)
+        assert !hexOutput.contains(Hex.toHexString("ydnA".bytes))
+
+        // Ensure the encryption metadata is present in the output
+        assert hexOutput.contains(Hex.toHexString("org.apache.nifi.security.repository.block.BlockEncryptionMetadata".bytes))
+    }
+
+    /** This test ensures that multiple records can be serialized and deserialized */
+    @Test
+    void testShouldSerializeAndDeserializeMultipleRecords() {
+        // Arrange
+        RepositoryRecord record1 = buildCreateRecord(flowFileQueue, [id: "1", firstName: "Andy", lastName: "LoPresto"])
+        RepositoryRecord record2 = buildCreateRecord(flowFileQueue, [id: "2", firstName: "Mark", lastName: "Payne"])
+        DataOutputStream dos = dataOutputStream
+
+        // Act
+        esrrs.writeHeader(dos)
+        esrrs.serializeRecord(record1, dos)
+        esrrs.serializeRecord(record2, dos)
+        dos.flush()
+        logger.info("Output stream: ${Hex.toHexString(byteArrayOutputStream.toByteArray())} ")
+
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))
+        esrrs.readHeader(dis)
+        RepositoryRecord deserializedRecord1 = esrrs.deserializeRecord(dis, esrrs.getVersion())
+        RepositoryRecord deserializedRecord2 = esrrs.deserializeRecord(dis, esrrs.getVersion())
+
+        /* The records will not be identical, because the process of serializing/deserializing changes the application
+         * of the delta data. The CREATE with a "current" record containing attributes becomes an UPDATE with an
+         * "original" record containing attributes */
+
+        logger.info("Original record 1: ${record1.dump()}")
+        logger.info("Original record 2: ${record2.dump()}")
+        logger.info("Deserialized record 1: ${deserializedRecord1.dump()}")
+        logger.info("Deserialized record 2: ${deserializedRecord2.dump()}")
+
+        // Assert
+        assert record1.type == RepositoryRecordType.CREATE
+        assert record2.type == RepositoryRecordType.CREATE
+
+        assert deserializedRecord1.type == RepositoryRecordType.UPDATE
+        assert deserializedRecord1.originalAttributes == record1.current.attributes
+
+        assert deserializedRecord2.type == RepositoryRecordType.UPDATE
+        assert deserializedRecord2.originalAttributes == record2.current.attributes
+    }
+
+    /** This test ensures that multiple records can be serialized and deserialized using different keys */
+    @Test
+    void testShouldSerializeAndDeserializeMultipleRecordsWithMultipleKeys() {
+        // Arrange
+        RepositoryRecord record1 = buildCreateRecord(flowFileQueue, [id: "1", firstName: "Andy", lastName: "LoPresto"])
+        RepositoryRecord record2 = buildCreateRecord(flowFileQueue, [id: "2", firstName: "Mark", lastName: "Payne"])
+        DataOutputStream dos = dataOutputStream
+
+        // Configure the serde with multiple keys available
+        def multipleKeys = KEYS + [K2: "0F" * 32]
+        FlowFileRepositoryEncryptionConfiguration multipleKeyFFREC = new FlowFileRepositoryEncryptionConfiguration(KPI, KPL, KEY_ID, multipleKeys, REPO_IMPL)
+
+        esrrs = new EncryptedSchemaRepositoryRecordSerde(wrappedSerDe, multipleKeyFFREC)
+        assert esrrs.getActiveKeyId() == "K1"
+
+        // Act
+        esrrs.writeHeader(dos)
+        esrrs.serializeRecord(record1, dos)
+
+        // Change the active key
+        esrrs.setActiveKeyId("K2")
+        esrrs.serializeRecord(record2, dos)
+        dos.flush()
+        logger.info("Output stream: ${Hex.toHexString(byteArrayOutputStream.toByteArray())} ")
+
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))
+        esrrs.readHeader(dis)
+        RepositoryRecord deserializedRecord1 = esrrs.deserializeRecord(dis, esrrs.getVersion())
+        RepositoryRecord deserializedRecord2 = esrrs.deserializeRecord(dis, esrrs.getVersion())
+
+        /* The records will not be identical, because the process of serializing/deserializing changes the application
+         * of the delta data. The CREATE with a "current" record containing attributes becomes an UPDATE with an
+         * "original" record containing attributes */
+
+        logger.info("Original record 1: ${record1.dump()}")
+        logger.info("Original record 2: ${record2.dump()}")
+        logger.info("Deserialized record 1: ${deserializedRecord1.dump()}")
+        logger.info("Deserialized record 2: ${deserializedRecord2.dump()}")
+
+        // Assert
+        assert record1.type == RepositoryRecordType.CREATE
+        assert record2.type == RepositoryRecordType.CREATE
+
+        assert deserializedRecord1.type == RepositoryRecordType.UPDATE
+        assert deserializedRecord1.originalAttributes == record1.current.attributes
+
+        assert deserializedRecord2.type == RepositoryRecordType.UPDATE
+        assert deserializedRecord2.originalAttributes == record2.current.attributes
+    }
+
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/resources/logback-test.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..617dcb6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/resources/logback-test.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration>
+<!--<configuration debug="true">-->
+    <!--<statusListener class="ch.qos.logback.core.status.OnConsoleStatusListener" />-->
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>%-4r [%t] %-5p %c{3} - %m%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+        <file>./target/log</file>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
+        </encoder>
+    </appender>
+
+
+    <logger name="org.apache.nifi" level="INFO"/>
+    <logger name="org.apache.nifi.controller.tasks" level="DEBUG"/>"
+    <logger name="org.apache.nifi.controller.service" level="DEBUG"/>
+    <logger name="org.apache.nifi.encrypt" level="DEBUG"/>
+    <logger name="org.apache.nifi.controller.repository" level="DEBUG"/>
+    <logger name="org.apache.nifi.security.repository" level="DEBUG"/>
+    <logger name="org.apache.nifi.controller.service.mock" level="ERROR"/>
+
+    <logger name="StandardProcessSession.claims" level="INFO" />
+
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+    </root>
+
+</configuration>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index d9b6659..07467eb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -242,6 +242,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.codehaus.groovy</groupId>
+            <artifactId>groovy-dateutil</artifactId>
+            <version>${nifi.groovy.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>com.github.ben-manes.caffeine</groupId>
             <artifactId>caffeine</artifactId>
             <version>1.0.1</version>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 86cfab9..1ff685d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -46,10 +46,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
-
 import javax.management.NotificationEmitter;
 import javax.net.ssl.SSLContext;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@@ -116,6 +114,7 @@ import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.reporting.ReportingTaskProvider;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.CounterRepository;
+import org.apache.nifi.controller.repository.EncryptedRepositoryRecordSerdeFactory;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
@@ -127,6 +126,7 @@ import org.apache.nifi.controller.repository.StandardQueueProvider;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
 import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
 import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
@@ -213,6 +213,7 @@ import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.util.concurrency.TimedLock;
+import org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
@@ -512,7 +513,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository);
 
         eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
-            eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider, eventDrivenWorkerQueue, repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
+                eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider,
+                eventDrivenWorkerQueue, repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
 
         final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor);
@@ -551,7 +553,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         this.reloadComponent = new StandardReloadComponent(this);
 
         final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), controllerServiceProvider, processScheduler,
-            nifiProperties, encryptor, this, new MutableVariableRegistry(this.variableRegistry));
+                nifiProperties, encryptor, this, new MutableVariableRegistry(this.variableRegistry));
         rootGroup.setName(FlowManager.DEFAULT_ROOT_GROUP_NAME);
         setRootGroup(rootGroup);
         instanceId = ComponentIdGenerator.generateId().toString();
@@ -605,9 +607,9 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
 
         componentStatusRepository = createComponentStatusRepository();
 
-        final boolean analyticsEnabled =  Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.ANALYTICS_PREDICTION_ENABLED, NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_ENABLED));
+        final boolean analyticsEnabled = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.ANALYTICS_PREDICTION_ENABLED, NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_ENABLED));
 
-        if(analyticsEnabled) {
+        if (analyticsEnabled) {
 
             // Determine interval for predicting future feature values
             final String predictionInterval = nifiProperties.getProperty(NiFiProperties.ANALYTICS_PREDICTION_INTERVAL, NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL);
@@ -615,8 +617,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             try {
                 predictionIntervalMillis = FormatUtils.getTimeDuration(predictionInterval, TimeUnit.MILLISECONDS);
             } catch (final Exception e) {
-                LOG.warn("Analytics is enabled however could not retrieve value for "+ NiFiProperties.ANALYTICS_PREDICTION_INTERVAL + ". This property has been set to '"
-                                                        + NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL + "'");
+                LOG.warn("Analytics is enabled however could not retrieve value for " + NiFiProperties.ANALYTICS_PREDICTION_INTERVAL + ". This property has been set to '"
+                        + NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL + "'");
                 predictionIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL, TimeUnit.MILLISECONDS);
             }
 
@@ -626,7 +628,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             try {
                 queryIntervalMillis = FormatUtils.getTimeDuration(queryInterval, TimeUnit.MILLISECONDS);
             } catch (final Exception e) {
-                LOG.warn("Analytics is enabled however could not retrieve value for "+ NiFiProperties.ANALYTICS_QUERY_INTERVAL + ". This property has been set to '"
+                LOG.warn("Analytics is enabled however could not retrieve value for " + NiFiProperties.ANALYTICS_QUERY_INTERVAL + ". This property has been set to '"
                         + NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL + "'");
                 queryIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL, TimeUnit.MILLISECONDS);
             }
@@ -638,10 +640,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             Double modelScoreThreshold;
             try {
                 modelScoreThreshold = Double.valueOf(nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD,
-                                                            Double.toString(NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD)));
+                        Double.toString(NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD)));
             } catch (final Exception e) {
-                LOG.warn("Analytics is enabled however could not retrieve value for "+ NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD + ". This property has been set to '"
-                                                        + NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD + "'.");
+                LOG.warn("Analytics is enabled however could not retrieve value for " + NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD + ". This property has been set to '"
+                        + NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD + "'.");
                 modelScoreThreshold = NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD;
             }
 
@@ -691,7 +693,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
                 // we may later determine that there is in fact no Cluster Coordinator. If this happens, we will automatically register for
                 // Cluster Coordinator through the StandardFlowService.
                 LOG.info("The Election for Cluster Coordinator has already begun (Leader is {}). Will not register to be elected for this role until after connecting "
-                    + "to the cluster and inheriting the cluster's flow.", clusterCoordinatorAddress);
+                        + "to the cluster and inheriting the cluster's flow.", clusterCoordinatorAddress);
                 registerForClusterCoordinator(false);
             }
 
@@ -715,13 +717,13 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
 
             final int connectionsPerNode = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_CONNECTIONS_PER_NODE, NiFiProperties.DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE);
             final NioAsyncLoadBalanceClientFactory asyncClientFactory = new NioAsyncLoadBalanceClientFactory(sslContext, timeoutMillis, new ContentRepositoryFlowFileAccess(contentRepository),
-                eventReporter, new StandardLoadBalanceFlowFileCodec());
+                    eventReporter, new StandardLoadBalanceFlowFileCodec());
             loadBalanceClientRegistry = new NioAsyncLoadBalanceClientRegistry(asyncClientFactory, connectionsPerNode);
 
             final int loadBalanceClientThreadCount = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT);
             loadBalanceClientThreadPool = new FlowEngine(loadBalanceClientThreadCount, "Load-Balanced Client", true);
 
-            for (int i=0; i < loadBalanceClientThreadCount; i++) {
+            for (int i = 0; i < loadBalanceClientThreadCount; i++) {
                 final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(loadBalanceClientRegistry, clusterCoordinator, eventReporter);
                 loadBalanceClientTasks.add(clientTask);
                 loadBalanceClientThreadPool.submit(clientTask);
@@ -752,9 +754,17 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         }
 
         try {
-            final FlowFileRepository created = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, FlowFileRepository.class, properties);
-            synchronized (created) {
-                created.initialize(contentClaimManager);
+            final FlowFileRepository created = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName,
+                    FlowFileRepository.class, properties);
+            if (EncryptedSequentialAccessWriteAheadLog.class.getName().equals(properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION))
+                    && created instanceof WriteAheadFlowFileRepository) {
+                synchronized (created) {
+                    ((WriteAheadFlowFileRepository) created).initialize(contentClaimManager, new EncryptedRepositoryRecordSerdeFactory(contentClaimManager, properties));
+                }
+            } else {
+                synchronized (created) {
+                    created.initialize(contentClaimManager);
+                }
             }
             return created;
         } catch (final Exception e) {
@@ -850,7 +860,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
 
             // Begin expiring FlowFiles that are old
             final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository,
-                flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
+                    flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
             processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);
 
             // now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the
@@ -1172,10 +1182,9 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
      * has finished.
      *
      * @param kill if <code>true</code>, attempts to stop all active threads,
-     * but makes no guarantee that this will happen
-     *
+     *             but makes no guarantee that this will happen
      * @throws IllegalStateException if the controller is already stopped or
-     * currently in the processor of stopping
+     *                               currently in the processor of stopping
      */
     public void shutdown(final boolean kill) {
         this.shutdown = true;
@@ -1299,9 +1308,9 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
      * Serializes the current state of the controller to the given OutputStream
      *
      * @param serializer serializer
-     * @param os stream
+     * @param os         stream
      * @throws FlowSerializationException if serialization of the flow fails for
-     * any reason
+     *                                    any reason
      */
     public synchronized <T> void serialize(final FlowSerializer<T> serializer, final OutputStream os) throws FlowSerializationException {
         T flowConfiguration;
@@ -1341,24 +1350,23 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
 
     /**
      * Synchronizes this controller with the proposed flow.
-     *
+     * <p>
      * For more details, see
      * {@link FlowSynchronizer#sync(FlowController, DataFlow, StringEncryptor)}.
      *
      * @param synchronizer synchronizer
-     * @param dataFlow the flow to load the controller with. If the flow is null
-     * or zero length, then the controller must not have a flow or else an
-     * UninheritableFlowException will be thrown.
-     *
-     * @throws FlowSerializationException if proposed flow is not a valid flow
-     * configuration file
-     * @throws UninheritableFlowException if the proposed flow cannot be loaded
-     * by the controller because in doing so would risk orphaning flow files
+     * @param dataFlow     the flow to load the controller with. If the flow is null
+     *                     or zero length, then the controller must not have a flow or else an
+     *                     UninheritableFlowException will be thrown.
+     * @throws FlowSerializationException   if proposed flow is not a valid flow
+     *                                      configuration file
+     * @throws UninheritableFlowException   if the proposed flow cannot be loaded
+     *                                      by the controller because in doing so would risk orphaning flow files
      * @throws FlowSynchronizationException if updates to the controller failed.
-     * If this exception is thrown, then the controller should be considered
-     * unsafe to be used
-     * @throws MissingBundleException if the proposed flow cannot be loaded by the
-     * controller because it contains a bundle that does not exist in the controller
+     *                                      If this exception is thrown, then the controller should be considered
+     *                                      unsafe to be used
+     * @throws MissingBundleException       if the proposed flow cannot be loaded by the
+     *                                      controller because it contains a bundle that does not exist in the controller
      */
     public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow)
             throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
@@ -1441,10 +1449,9 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
      * Sets the root group to the given group
      *
      * @param group the ProcessGroup that is to become the new Root Group
-     *
      * @throws IllegalArgumentException if the ProcessGroup has a parent
-     * @throws IllegalStateException if the FlowController does not know about
-     * the given process group
+     * @throws IllegalStateException    if the FlowController does not know about
+     *                                  the given process group
      */
     void setRootGroup(final ProcessGroup group) {
         if (requireNonNull(group).getParent() != null) {
@@ -1619,7 +1626,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     }
 
 
-
     public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
         final List<GarbageCollectionStatus> statuses = new ArrayList<>();
 
@@ -1769,7 +1775,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     }
 
 
-
     @Override
     public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
         if (isTerminated()) {
@@ -1810,18 +1815,15 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     /**
      * Creates a connection between two Connectable objects.
      *
-     * @param id required ID of the connection
-     * @param name the name of the connection, or <code>null</code> to leave the
-     * connection unnamed
-     * @param source required source
-     * @param destination required destination
+     * @param id                required ID of the connection
+     * @param name              the name of the connection, or <code>null</code> to leave the
+     *                          connection unnamed
+     * @param source            required source
+     * @param destination       required destination
      * @param relationshipNames required collection of relationship names
-     * @return
-     *
-     * @throws NullPointerException if the ID, source, destination, or set of
-     * relationships is null.
-     * @throws IllegalArgumentException if <code>relationships</code> is an
-     * empty collection
+     * @return the connection
+     * @throws NullPointerException     if the ID, source, destination, or set of relationships is null.
+     * @throws IllegalArgumentException if <code>relationships</code> is an empty collection
      */
     public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) {
         final StandardConnection.Builder builder = new StandardConnection.Builder(processScheduler);
@@ -1863,10 +1865,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
 
                 if (clusterCoordinator == null) {
                     flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
-                        eventReporter, nifiProperties.getQueueSwapThreshold(), nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold());
+                            eventReporter, nifiProperties.getQueueSwapThreshold(), nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold());
                 } else {
                     flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, eventListener, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
-                        clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
+                            clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
 
                     flowFileQueue.setBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold());
                     flowFileQueue.setBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold());
@@ -1877,18 +1879,17 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         };
 
         final Connection connection = builder.id(requireNonNull(id).intern())
-            .name(name == null ? null : name.intern())
-            .relationships(relationships)
-            .source(requireNonNull(source))
-            .destination(destination)
-            .flowFileQueueFactory(flowFileQueueFactory)
-            .build();
+                .name(name == null ? null : name.intern())
+                .relationships(relationships)
+                .source(requireNonNull(source))
+                .destination(destination)
+                .flowFileQueueFactory(flowFileQueueFactory)
+                .build();
 
         return connection;
     }
 
 
-
     @Override
     public ReportingTaskNode getReportingTaskNode(final String identifier) {
         return flowManager.getReportingTaskNode(identifier);
@@ -1941,7 +1942,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     }
 
 
-
     //
     // Counters
     //
@@ -2060,6 +2060,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     //
     // Clustering methods
     //
+
     /**
      * Starts heartbeating to the cluster. May only be called if the instance
      * was constructed for a clustered environment.
@@ -2247,9 +2248,9 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
      * Sets whether this instance is clustered. Clustered means that a node is
      * either connected or trying to connect to the cluster.
      *
-     * @param clustered true if clustered
+     * @param clustered         true if clustered
      * @param clusterInstanceId if clustered is true, indicates the InstanceID
-     * of the Cluster Manager
+     *                          of the Cluster Manager
      */
     public void setClustered(final boolean clustered, final String clusterInstanceId) {
         writeLock.lock();
@@ -2462,7 +2463,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             }
 
             final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
-                provEvent.getPreviousContentClaimIdentifier(), false, false);
+                    provEvent.getPreviousContentClaimIdentifier(), false, false);
             claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
             offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
             size = provEvent.getPreviousFileSize();
@@ -2472,7 +2473,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             }
 
             final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
-                provEvent.getContentClaimIdentifier(), false, false);
+                    provEvent.getContentClaimIdentifier(), false, false);
 
             claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
             offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset();
@@ -2653,10 +2654,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         // Claim. If, for instance, we are simply creating the claim to request its content, as in #getContentAvailability, etc.
         // then this is not necessary.
         ResourceClaim resourceClaim = resourceClaimManager.getResourceClaim(event.getPreviousContentClaimContainer(),
-            event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier());
+                event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier());
         if (resourceClaim == null) {
             resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
-                event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false);
+                    event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false);
         }
 
         // Increment Claimant Count, since we will now be referencing the Content Claim
@@ -2854,7 +2855,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     }
 
 
-
     public Integer getRemoteSiteListeningPort() {
         return remoteInputSocketPort;
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index dcd0f32..d81ead3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -16,22 +16,6 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
-import org.apache.nifi.wali.SnapshotCapture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SyncListener;
-import org.wali.WriteAheadRepository;
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -58,6 +42,21 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
+import org.apache.nifi.wali.SnapshotCapture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.MinimalLockingWriteAheadLog;
+import org.wali.SyncListener;
+import org.wali.WriteAheadRepository;
 
 /**
  * <p>
@@ -83,34 +82,35 @@ import java.util.stream.Collectors;
  * </p>
  */
 public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncListener {
-    private static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = "nifi.flowfile.repository.directory";
+    static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = "nifi.flowfile.repository.directory";
     private static final String WRITE_AHEAD_LOG_IMPL = "nifi.flowfile.repository.wal.implementation";
 
-    private static final String SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
+    static final String SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
+    static final String ENCRYPTED_SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog";
     private static final String MINIMAL_LOCKING_WALI = "org.wali.MinimalLockingWriteAheadLog";
     private static final String DEFAULT_WAL_IMPLEMENTATION = SEQUENTIAL_ACCESS_WAL;
 
-    private final String walImplementation;
-    private final NiFiProperties nifiProperties;
+    final String walImplementation;
+    protected final NiFiProperties nifiProperties;
 
-    private final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
+    final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
     private final boolean alwaysSync;
 
     private static final Logger logger = LoggerFactory.getLogger(WriteAheadFlowFileRepository.class);
-    private volatile ScheduledFuture<?> checkpointFuture;
+    volatile ScheduledFuture<?> checkpointFuture;
 
-    private final long checkpointDelayMillis;
+    final long checkpointDelayMillis;
     private final List<File> flowFileRepositoryPaths = new ArrayList<>();
-    private final List<File> recoveryFiles = new ArrayList<>();
+    final List<File> recoveryFiles = new ArrayList<>();
     private final int numPartitions;
-    private final ScheduledExecutorService checkpointExecutor;
+    final ScheduledExecutorService checkpointExecutor;
 
-    private final Set<String> swapLocationSuffixes = new HashSet<>(); // guarded by synchronizing on object itself
+    final Set<String> swapLocationSuffixes = new HashSet<>(); // guarded by synchronizing on object itself
 
     // effectively final
     private WriteAheadRepository<RepositoryRecord> wal;
-    private RepositoryRecordSerdeFactory serdeFactory;
-    private ResourceClaimManager claimManager;
+    RepositoryRecordSerdeFactory serdeFactory;
+    ResourceClaimManager claimManager;
 
     // WALI Provides the ability to register callbacks for when a Partition or the entire Repository is sync'ed with the underlying disk.
     // We keep track of this because we need to ensure that the ContentClaims are destroyed only after the FlowFile Repository has been
@@ -158,9 +158,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         }
         this.walImplementation = writeAheadLogImpl;
 
-        // We used to use one implementation of the write-ahead log, but we now want to use the other, we must address this. Since the
-        // MinimalLockingWriteAheadLog supports multiple partitions, we need to ensure that we recover records from all
-        // partitions, so we build up a List of Files for the recovery files.
+        // We used to use one implementation (minimal locking) of the write-ahead log, but we now want to use the other
+        // (sequential access), we must address this. Since the MinimalLockingWriteAheadLog supports multiple partitions,
+        // we need to ensure that we recover records from all partitions, so we build up a List of Files for the
+        // recovery files.
         for (final String propertyName : nifiProperties.getPropertyKeys()) {
             if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) {
                 final String dirName = nifiProperties.getProperty(propertyName);
@@ -168,7 +169,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
             }
         }
 
-        if (walImplementation.equals(SEQUENTIAL_ACCESS_WAL)) {
+        if (isSequentialAccessWAL(walImplementation)) {
             final String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
             flowFileRepositoryPaths.add(new File(directoryName));
         } else {
@@ -182,12 +183,22 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
     }
 
+    /**
+     * Returns true if the provided implementation is a sequential access write ahead log (plaintext or encrypted).
+     *
+     * @param walImplementation the implementation to check
+     * @return true if this implementation is sequential access
+     */
+    private static boolean isSequentialAccessWAL(String walImplementation) {
+        return walImplementation.equals(SEQUENTIAL_ACCESS_WAL) || walImplementation.equals(ENCRYPTED_SEQUENTIAL_ACCESS_WAL);
+    }
+
     @Override
     public void initialize(final ResourceClaimManager claimManager) throws IOException {
         initialize(claimManager, new StandardRepositoryRecordSerdeFactory(claimManager));
     }
 
-    protected void initialize(final ResourceClaimManager claimManager, final RepositoryRecordSerdeFactory serdeFactory) throws IOException {
+    public void initialize(final ResourceClaimManager claimManager, final RepositoryRecordSerdeFactory serdeFactory) throws IOException {
         this.claimManager = claimManager;
 
         for (final File file : flowFileRepositoryPaths) {
@@ -200,17 +211,19 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         // delete backup. On restore, if no files exist in partition's directory, would have to check backup directory
         this.serdeFactory = serdeFactory;
 
-        if (walImplementation.equals(SEQUENTIAL_ACCESS_WAL)) {
+        // The specified implementation can be plaintext or encrypted; the only difference is the serde factory
+        if (isSequentialAccessWAL(walImplementation)) {
+            // TODO: May need to instantiate ESAWAL for clarity?
             wal = new SequentialAccessWriteAheadLog<>(flowFileRepositoryPaths.get(0), serdeFactory, this);
         } else if (walImplementation.equals(MINIMAL_LOCKING_WALI)) {
             final SortedSet<Path> paths = flowFileRepositoryPaths.stream()
-                .map(File::toPath)
-                .collect(Collectors.toCollection(TreeSet::new));
+                    .map(File::toPath)
+                    .collect(Collectors.toCollection(TreeSet::new));
 
             wal = new MinimalLockingWriteAheadLog<>(paths, numPartitions, serdeFactory, this);
         } else {
             throw new IllegalStateException("Cannot create Write-Ahead Log because the configured property '" + WRITE_AHEAD_LOG_IMPL + "' has an invalid value of '" + walImplementation
-                + "'. Please update nifi.properties to indicate a valid value for this property.");
+                    + "'. Please update nifi.properties to indicate a valid value for this property.");
         }
 
         logger.info("Initialized FlowFile Repository using {} partitions", numPartitions);
@@ -233,7 +246,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
 
     @Override
     public Map<ResourceClaim, Set<ResourceClaimReference>> findResourceClaimReferences(final Set<ResourceClaim> resourceClaims, final FlowFileSwapManager swapManager) throws IOException {
-        if (!(wal instanceof SequentialAccessWriteAheadLog)) {
+        if (!(isSequentialAccessWAL(walImplementation))) {
             return null;
         }
 
@@ -299,7 +312,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
 
             @Override
             public String toString() {
-                return "Swap File[location=" +  getSwapLocation() + ", queue=" + getQueueIdentifier() + "]";
+                return "Swap File[location=" + getSwapLocation() + ", queue=" + getQueueIdentifier() + "]";
             }
 
             @Override
@@ -450,7 +463,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
     private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException {
         for (final RepositoryRecord record : records) {
             if (record.getType() != RepositoryRecordType.DELETE && record.getType() != RepositoryRecordType.CONTENTMISSING
-                && record.getType() != RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS && record.getDestination() == null) {
+                    && record.getType() != RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS && record.getDestination() == null) {
                 throw new IllegalArgumentException("Record " + record + " has no destination and Type is " + record.getType());
             }
         }
@@ -458,7 +471,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         // Partition records by whether or not their type is 'CLEANUP_TRANSIENT_CLAIMS'. We do this because we don't want to send
         // these types of records to the Write-Ahead Log.
         final Map<Boolean, List<RepositoryRecord>> partitionedRecords = records.stream()
-            .collect(Collectors.partitioningBy(record -> record.getType() == RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS));
+                .collect(Collectors.partitioningBy(record -> record.getType() == RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS));
 
         List<RepositoryRecord> recordsForWal = partitionedRecords.get(Boolean.FALSE);
         if (recordsForWal == null) {
@@ -470,7 +483,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         updateContentClaims(records, partitionIndex);
     }
 
-    private void updateContentClaims(Collection<RepositoryRecord> repositoryRecords, final int partitionIndex) {
+    protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecords, final int partitionIndex) {
         // The below code is not entirely thread-safe, but we are OK with that because the results aren't really harmful.
         // Specifically, if two different threads call updateRepository with DELETE records for the same Content Claim,
         // it's quite possible for claimant count to be 0 below, which results in two different threads adding the Content
@@ -597,7 +610,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
 
     @Override
     public void onSync(final int partitionIndex) {
-        final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex));
+        final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(partitionIndex);
         if (claimQueue == null) {
             return;
         }
@@ -627,7 +640,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
      * the specified Swap File and returns the number of FlowFiles that were
      * persisted.
      *
-     * @param queue queue to swap out
+     * @param queue        queue to swap out
      * @param swapLocation location to swap to
      * @throws IOException ioe
      */
@@ -654,7 +667,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
             this.swapLocationSuffixes.add(normalizeSwapLocation(swapLocation));
         }
 
-        logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{swappedOut.size(), queue, swapLocation});
+        logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", swappedOut.size(), queue, swapLocation);
     }
 
     @Override
@@ -678,7 +691,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue});
     }
 
-    private void deleteRecursively(final File dir) {
+    void deleteRecursively(final File dir) {
         final File[] children = dir.listFiles();
 
         if (children != null) {
@@ -747,11 +760,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         }
 
         logger.info("Encountered FlowFile Repository that was written using the 'Minimal Locking Write-Ahead Log'. "
-            + "Will recover from this version and re-write the repository using the new version of the Write-Ahead Log.");
+                + "Will recover from this version and re-write the repository using the new version of the Write-Ahead Log.");
 
         final SortedSet<Path> paths = recoveryFiles.stream()
-            .map(File::toPath)
-            .collect(Collectors.toCollection(TreeSet::new));
+                .map(File::toPath)
+                .collect(Collectors.toCollection(TreeSet::new));
 
         final Collection<RepositoryRecord> recordList;
         final MinimalLockingWriteAheadLog<RepositoryRecord> minimalLockingWal = new MinimalLockingWriteAheadLog<>(paths, partitionDirs.size(), serdeFactory, null);
@@ -808,7 +821,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         // Since these implementations do not write to the same files, they will not interfere with one another. If we do recover records,
         // then we will update the new WAL (with fsync()) and delete the old repository so that we won't recover it again.
         if (recordList == null || recordList.isEmpty()) {
-            if (walImplementation.equals(SEQUENTIAL_ACCESS_WAL)) {
+            if (isSequentialAccessWAL(walImplementation)) {
                 // Configured to use Sequential Access WAL but it has no records. Check if there are records in
                 // a MinimalLockingWriteAheadLog that we can recover.
                 recordList = migrateFromMinimalLockingLog(wal).orElse(new ArrayList<>());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
index cede0f9..a53df71 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
@@ -23,17 +23,14 @@ import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.security.KeyManagementException;
 import javax.crypto.CipherOutputStream;
-import javax.crypto.SecretKey;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.controller.repository.FileSystemRepository;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.security.kms.CryptoUtils;
 import org.apache.nifi.security.kms.EncryptionException;
 import org.apache.nifi.security.kms.KeyProvider;
-import org.apache.nifi.security.kms.KeyProviderFactory;
+import org.apache.nifi.security.repository.RepositoryEncryptorUtils;
 import org.apache.nifi.security.repository.RepositoryType;
-import org.apache.nifi.security.repository.config.RepositoryEncryptionConfiguration;
 import org.apache.nifi.security.repository.stream.RepositoryObjectStreamEncryptor;
 import org.apache.nifi.security.repository.stream.aes.RepositoryObjectAESCTREncryptor;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
@@ -69,61 +66,14 @@ public class EncryptedFileSystemRepository extends FileSystemRepository {
     public EncryptedFileSystemRepository(final NiFiProperties niFiProperties) throws IOException {
         super(niFiProperties);
 
-        // Initialize key provider
-        initializeEncryptionServices(niFiProperties);
-    }
-
-    private void initializeEncryptionServices(NiFiProperties niFiProperties) throws IOException {
         // Initialize the encryption-specific fields
-        if (CryptoUtils.isContentRepositoryEncryptionConfigured(niFiProperties)) {
-            try {
-                KeyProvider keyProvider;
-                final String keyProviderImplementation = niFiProperties.getProperty(NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS);
-                if (KeyProviderFactory.requiresMasterKey(keyProviderImplementation)) {
-                    SecretKey masterKey = CryptoUtils.getMasterKey();
-                    keyProvider = buildKeyProvider(niFiProperties, masterKey);
-                } else {
-                    keyProvider = buildKeyProvider(niFiProperties);
-                }
-                this.keyProvider = keyProvider;
-            } catch (KeyManagementException e) {
-                String msg = "Encountered an error building the key provider";
-                logger.error(msg, e);
-                throw new IOException(msg, e);
-            }
-        } else {
-            throw new IOException("The provided configuration does not support a encrypted repository");
-        }
+        this.keyProvider = RepositoryEncryptorUtils.validateAndBuildRepositoryKeyProvider(niFiProperties, RepositoryType.CONTENT);
+
         // Set active key ID
         setActiveKeyId(niFiProperties.getContentRepositoryEncryptionKeyId());
     }
 
     /**
-     * Returns a configured {@link KeyProvider} instance that does not require a {@code master key} to use (usually a {@link org.apache.nifi.security.kms.StaticKeyProvider}).
-     *
-     * @param niFiProperties the {@link NiFiProperties} object
-     * @return the configured KeyProvider
-     * @throws KeyManagementException if there is a problem with the configuration
-     */
-    private static KeyProvider buildKeyProvider(NiFiProperties niFiProperties) throws KeyManagementException {
-        return buildKeyProvider(niFiProperties, null);
-    }
-
-    /**
-     * Returns a configured {@link KeyProvider} instance that requires a {@code master key} to use
-     * (usually a {@link org.apache.nifi.security.kms.FileBasedKeyProvider} or an encrypted
-     * {@link org.apache.nifi.security.kms.StaticKeyProvider}).
-     *
-     * @param niFiProperties the {@link NiFiProperties} object
-     * @param masterKey      the master encryption key used to encrypt the data encryption keys in the key provider configuration
-     * @return the configured KeyProvider
-     * @throws KeyManagementException if there is a problem with the configuration
-     */
-    private static KeyProvider buildKeyProvider(NiFiProperties niFiProperties, SecretKey masterKey) throws KeyManagementException {
-        return KeyProviderFactory.buildKeyProvider(RepositoryEncryptionConfiguration.fromNiFiProperties(niFiProperties, RepositoryType.CONTENT), masterKey);
-    }
-
-    /**
      * Returns the number of bytes read after importing content from the provided
      * {@link InputStream} into the {@link ContentClaim}. This method has the same logic as
      * the parent method, but must be overridden to use the subclass's
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLog.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLog.java
new file mode 100644
index 0000000..05c78f3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLog.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali;
+
+import java.io.File;
+import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDeFactory;
+import org.wali.SyncListener;
+
+/**
+ * <p>
+ * This implementation of {@link org.wali.WriteAheadRepository} is just a marker implementation wrapping the {@link SequentialAccessWriteAheadLog}. It exists to allow
+ * users to configure  {@code nifi.properties} with
+ * {@code nifi.flowfile.repository.wal.implementation=org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog}
+ * because the {@link org.wali.SerDe} interface is not exposed at that level. By selecting
+ * this WAL implementation, the admin is enabling the encrypted flowfile repository, but all
+ * other behavior is identical.
+ * </p>
+ *
+ * <p>
+ * This implementation transparently encrypts the objects as they are persisted to the journal file.
+ * </p>
+ */
+public class EncryptedSequentialAccessWriteAheadLog<T> extends SequentialAccessWriteAheadLog<T> {
+    private static final Logger logger = LoggerFactory.getLogger(EncryptedSequentialAccessWriteAheadLog.class);
+
+
+    public EncryptedSequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory<T> serdeFactory) throws IOException {
+        this(storageDirectory, serdeFactory, SyncListener.NOP_SYNC_LISTENER);
+    }
+
+    public EncryptedSequentialAccessWriteAheadLog(final File storageDirectory, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException {
+        super(storageDirectory, serdeFactory, syncListener);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy
new file mode 100644
index 0000000..3c978f1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.wali
+
+import ch.qos.logback.classic.Level
+import org.apache.nifi.controller.queue.FlowFileQueue
+import org.apache.nifi.controller.repository.EncryptedSchemaRepositoryRecordSerde
+import org.apache.nifi.controller.repository.RepositoryRecord
+import org.apache.nifi.controller.repository.RepositoryRecordType
+import org.apache.nifi.controller.repository.SchemaRepositoryRecordSerde
+import org.apache.nifi.controller.repository.StandardFlowFileRecord
+import org.apache.nifi.controller.repository.StandardRepositoryRecord
+import org.apache.nifi.controller.repository.StandardRepositoryRecordSerdeFactory
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager
+import org.apache.nifi.security.kms.CryptoUtils
+import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.junit.After
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Rule
+import org.junit.Test
+import org.junit.rules.TestName
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import org.wali.SerDe
+import org.wali.SerDeFactory
+import org.wali.SingletonSerDeFactory
+
+import java.security.Security
+
+import static org.apache.nifi.security.kms.CryptoUtils.STATIC_KEY_PROVIDER_CLASS_NAME
+import static org.junit.Assert.assertNotNull
+
+@RunWith(JUnit4.class)
+class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase {
+    private static final Logger logger = LoggerFactory.getLogger(EncryptedSequentialAccessWriteAheadLogTest.class)
+
+    private static Level ORIGINAL_REPO_LOG_LEVEL
+    private static Level ORIGINAL_TEST_LOG_LEVEL
+    private static final String REPO_LOG_PACKAGE = "org.apache.nifi.security.repository"
+
+    public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier"
+
+    private ResourceClaimManager claimManager
+    private Map<String, FlowFileQueue> queueMap
+    private FlowFileQueue flowFileQueue
+    private ByteArrayOutputStream byteArrayOutputStream
+    private DataOutputStream dataOutputStream
+
+    // TODO: Mock the wrapped serde
+    // TODO: Make integration test with real wrapped serde
+    private SerDe<RepositoryRecord> wrappedSerDe
+
+    private static final String KPI = STATIC_KEY_PROVIDER_CLASS_NAME
+    private static final String KPL = ""
+    private static final String KEY_ID = "K1"
+    private static final Map<String, String> KEYS = [K1: "0123456789ABCDEFFEDCBA98765432100123456789ABCDEFFEDCBA9876543210"]
+    // TODO: Change to WAL impl name
+    private static final String REPO_IMPL = CryptoUtils.EWAFFR_CLASS_NAME
+
+    private FlowFileRepositoryEncryptionConfiguration flowFileREC
+
+    private EncryptedSchemaRepositoryRecordSerde esrrs
+
+    private final EncryptedSequentialAccessWriteAheadLog<RepositoryRecord> encryptedWAL
+
+    @Rule
+    public TestName testName = new TestName()
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        Security.addProvider(new BouncyCastleProvider())
+
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.debug("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+    }
+
+    @Before
+    void setUp() throws Exception {
+        claimManager = new StandardResourceClaimManager()
+        queueMap = [:]
+        flowFileQueue = createAndRegisterMockQueue(TEST_QUEUE_IDENTIFIER)
+        byteArrayOutputStream = new ByteArrayOutputStream()
+        dataOutputStream = new DataOutputStream(byteArrayOutputStream)
+        wrappedSerDe = new SchemaRepositoryRecordSerde(claimManager)
+        wrappedSerDe.setQueueMap(queueMap)
+
+        flowFileREC = new FlowFileRepositoryEncryptionConfiguration(KPI, KPL, KEY_ID, KEYS, REPO_IMPL)
+
+        esrrs = new EncryptedSchemaRepositoryRecordSerde(wrappedSerDe, flowFileREC)
+    }
+
+    @After
+    void tearDown() throws Exception {
+        claimManager.purge()
+        queueMap.clear()
+    }
+
+    private FlowFileQueue createMockQueue(String identifier = testName.methodName + new Date().toString()) {
+        [getIdentifier: { ->
+            logger.mock("Retrieving flowfile queue identifier: ${identifier}" as String)
+            identifier
+        }] as FlowFileQueue
+    }
+
+    private FlowFileQueue createAndRegisterMockQueue(String identifier = testName.methodName + new Date().toString()) {
+        FlowFileQueue queue = createMockQueue(identifier)
+        queueMap.put(identifier, queue)
+        queue
+    }
+
+    private RepositoryRecord buildCreateRecord(FlowFileQueue queue, Map<String, String> attributes = [:]) {
+        StandardRepositoryRecord record = new StandardRepositoryRecord(queue)
+        StandardFlowFileRecord.Builder ffrb = new StandardFlowFileRecord.Builder().id(System.nanoTime())
+        ffrb.addAttributes([uuid: getMockUUID()] + attributes as Map<String, String>)
+        record.setWorking(ffrb.build())
+        record
+    }
+
+    private String getMockUUID() {
+        "${testName.methodName ?: "no_test"}@${new Date().format("mmssSSS")}" as String
+    }
+
+    /** This test creates flowfile records, adds them to the repository, and then recovers them to ensure they were persisted */
+    @Test
+    void testShouldUpdateWithExternalFile() {
+        // Arrange
+        final EncryptedSchemaRepositoryRecordSerde encryptedSerde = buildEncryptedSerDe()
+
+        final SequentialAccessWriteAheadLog<RepositoryRecord> repo = createWriteRepo(encryptedSerde)
+
+        final List<RepositoryRecord> records = new ArrayList<>()
+        10.times { int i ->
+            def attributes = [name: "User ${i}" as String, age: "${i}" as String]
+            final RepositoryRecord record = buildCreateRecord(flowFileQueue, attributes)
+            records.add(record)
+        }
+
+        // Act
+        repo.update(records, false)
+        repo.shutdown()
+
+        // Assert
+        final SequentialAccessWriteAheadLog<RepositoryRecord> recoveryRepo = createRecoveryRepo()
+        final Collection<RepositoryRecord> recovered = recoveryRepo.recoverRecords()
+
+        // Ensure that the same records (except now UPDATE instead of CREATE) are returned (order is not guaranteed)
+        assert recovered.size() == records.size()
+        assert recovered.every { it.type == RepositoryRecordType.UPDATE }
+
+        // Check that all attributes (flowfile record) in the recovered records were present in the original list
+        assert recovered.every { (it as StandardRepositoryRecord).current in records*.current }
+    }
+
+    /** This test creates flowfile records, adds them to the repository, and then recovers them to ensure they were persisted */
+    @Test
+    void testShouldUpdateWithExternalFileAfterCheckpoint() {
+        // Arrange
+        final EncryptedSchemaRepositoryRecordSerde encryptedSerde = buildEncryptedSerDe()
+
+        final SequentialAccessWriteAheadLog<RepositoryRecord> repo = createWriteRepo(encryptedSerde)
+
+        // Turn off debugging because of the high volume
+        logger.debug("Temporarily turning off DEBUG logging")
+        def encryptorLogger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(REPO_LOG_PACKAGE)
+        ORIGINAL_REPO_LOG_LEVEL = encryptorLogger.getLevel()
+        encryptorLogger.setLevel(Level.INFO)
+
+        def testLogger = (ch.qos.logback.classic.Logger) logger
+        ORIGINAL_TEST_LOG_LEVEL = testLogger.getLevel()
+        testLogger.setLevel(Level.INFO)
+
+        final List<RepositoryRecord> records = new ArrayList<>()
+        100_000.times { int i ->
+            def attributes = [name: "User ${i}" as String, age: "${i}" as String]
+            final RepositoryRecord record = buildCreateRecord(flowFileQueue, attributes)
+            records.add(record)
+        }
+
+        // Act
+        repo.update(records, false)
+        repo.shutdown()
+
+        // Assert
+        final SequentialAccessWriteAheadLog<RepositoryRecord> recoveryRepo = createRecoveryRepo()
+        final Collection<RepositoryRecord> recovered = recoveryRepo.recoverRecords()
+
+        // Ensure that the same records (except now UPDATE instead of CREATE) are returned (order is not guaranteed)
+        assert recovered.size() == records.size()
+        assert recovered.every { it.type == RepositoryRecordType.UPDATE }
+
+        // Reset log level
+        encryptorLogger.setLevel(ORIGINAL_REPO_LOG_LEVEL)
+        testLogger.setLevel(ORIGINAL_TEST_LOG_LEVEL)
+        logger.debug("Re-enabled DEBUG logging")
+    }
+
+    private EncryptedSchemaRepositoryRecordSerde buildEncryptedSerDe(FlowFileRepositoryEncryptionConfiguration ffrec = flowFileREC) {
+        final StandardRepositoryRecordSerdeFactory factory = new StandardRepositoryRecordSerdeFactory(claimManager)
+        SchemaRepositoryRecordSerde wrappedSerDe = factory.createSerDe() as SchemaRepositoryRecordSerde
+        wrappedSerDe.setQueueMap(queueMap)
+        return new EncryptedSchemaRepositoryRecordSerde(wrappedSerDe, ffrec)
+    }
+
+    private SequentialAccessWriteAheadLog<RepositoryRecord> createWriteRepo() throws IOException {
+        return createWriteRepo(buildEncryptedSerDe())
+    }
+
+    private SequentialAccessWriteAheadLog<RepositoryRecord> createWriteRepo(final SerDe<RepositoryRecord> serde) throws IOException {
+        final File targetDir = new File("target")
+        final File storageDir = new File(targetDir, testName?.methodName ?: "unknown_test")
+        deleteRecursively(storageDir)
+        assertTrue(storageDir.mkdirs())
+
+        final SerDeFactory<RepositoryRecord> serdeFactory = new SingletonSerDeFactory<>(serde)
+        final SequentialAccessWriteAheadLog<RepositoryRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory)
+
+        final Collection<RepositoryRecord> recovered = repo.recoverRecords()
+        assertNotNull(recovered)
+        assertTrue(recovered.isEmpty())
+
+        return repo
+    }
+
+    private SequentialAccessWriteAheadLog<RepositoryRecord> createRecoveryRepo() throws IOException {
+        final File targetDir = new File("target")
+        final File storageDir = new File(targetDir, testName?.methodName ?: "unknown_test")
+
+        final SerDe<RepositoryRecord> serde = buildEncryptedSerDe()
+        final SerDeFactory<RepositoryRecord> serdeFactory = new SingletonSerDeFactory<>(serde)
+        final SequentialAccessWriteAheadLog<RepositoryRecord> repo = new SequentialAccessWriteAheadLog<>(storageDir, serdeFactory)
+
+        return repo
+    }
+
+    private void deleteRecursively(final File file) {
+        final File[] children = file.listFiles()
+        if (children != null) {
+            for (final File child : children) {
+                deleteRecursively(child)
+            }
+        }
+
+        file.delete()
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/StandardNiFiPropertiesGroovyTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/StandardNiFiPropertiesGroovyTest.groovy
index 155ca7b..a1ed963 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/StandardNiFiPropertiesGroovyTest.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-properties-loader/src/test/groovy/org/apache/nifi/properties/StandardNiFiPropertiesGroovyTest.groovy
@@ -38,6 +38,9 @@ class StandardNiFiPropertiesGroovyTest extends GroovyTestCase {
     private static final String CREK = NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY
     private static final String CREKID = NiFiProperties.CONTENT_REPOSITORY_ENCRYPTION_KEY_ID
 
+    private static final String FFREK = NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY
+    private static final String FFREKID = NiFiProperties.FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID
+
     @BeforeClass
     static void setUpOnce() throws Exception {
         logger.metaClass.methodMissing = { String name, args ->
@@ -212,6 +215,222 @@ class StandardNiFiPropertiesGroovyTest extends GroovyTestCase {
     }
 
     @Test
+    void testShouldGetFlowFileRepoEncryptionKeysFromMultiplePropertiesAfterMigration() throws Exception {
+        // Arrange
+        Properties rawProperties = new Properties()
+        final String KEY_ID = "K1"
+        final String KEY_HEX = "0123456789ABCDEFFEDCBA9876543210"
+        final String KEY_ID_2 = "K2"
+        final String KEY_HEX_2 = "AAAABBBBCCCCDDDDEEEEFFFF00001111"
+
+        /**
+         * This simulates after the initial key migration (K1 -> K2). The {@code nifi.properties} will look like:
+         *
+         * nifi.flowfile.repository.encryption.key.id=K2
+         * nifi.flowfile.repository.encryption.key=
+         * nifi.flowfile.repository.encryption.key.id.K1=0123456789ABCDEFFEDCBA9876543210
+         * nifi.flowfile.repository.encryption.key.id.K2=00000000000000000000000000000000
+         */
+
+        rawProperties.setProperty(FFREKID, KEY_ID_2)
+        rawProperties.setProperty(FFREK, "")
+        rawProperties.setProperty("${FFREK}.id.${KEY_ID}", KEY_HEX)
+        rawProperties.setProperty("${FFREK}.id.${KEY_ID_2}", KEY_HEX_2)
+        NiFiProperties niFiProperties = new StandardNiFiProperties(rawProperties)
+        logger.info("niFiProperties has ${niFiProperties.size()} properties: ${niFiProperties.getPropertyKeys()}")
+
+        // Act
+        def keyId = niFiProperties.getFlowFileRepoEncryptionKeyId()
+        def key = niFiProperties.getFlowFileRepoEncryptionKey()
+        def keys = niFiProperties.getFlowFileRepoEncryptionKeys()
+
+        logger.info("Retrieved key ID: ${keyId}")
+        logger.info("Retrieved key: ${key}")
+        logger.info("Retrieved keys: ${keys}")
+
+        // Assert
+        assert keyId == KEY_ID_2
+        assert key == KEY_HEX_2
+        assert keys == [(KEY_ID): KEY_HEX, (KEY_ID_2): KEY_HEX_2]
+    }
+
+    @Test
+    void testShouldGetFlowFileRepoEncryptionKeysFromMultiplePropertiesWithoutExplicitKey() throws Exception {
+        // Arrange
+        Properties rawProperties = new Properties()
+        final String KEY_ID = "K1"
+        final String KEY_HEX = "0123456789ABCDEFFEDCBA9876543210"
+        final String KEY_ID_2 = "K2"
+        final String KEY_HEX_2 = "AAAABBBBCCCCDDDDEEEEFFFF00001111"
+
+        /**
+         * This simulates after the initial key migration (K1 -> K2). The {@code nifi.properties} will look like:
+         *
+         * (note no nifi.flowfile.repository.encryption.key=)
+         * nifi.flowfile.repository.encryption.key.id=K2
+         * nifi.flowfile.repository.encryption.key.id.K1=0123456789ABCDEFFEDCBA9876543210
+         * nifi.flowfile.repository.encryption.key.id.K2=00000000000000000000000000000000
+         */
+
+        rawProperties.setProperty(FFREKID, KEY_ID_2)
+//        rawProperties.setProperty(FFREK, "")
+        rawProperties.setProperty("${FFREK}.id.${KEY_ID}", KEY_HEX)
+        rawProperties.setProperty("${FFREK}.id.${KEY_ID_2}", KEY_HEX_2)
+        NiFiProperties niFiProperties = new StandardNiFiProperties(rawProperties)
+        logger.info("niFiProperties has ${niFiProperties.size()} properties: ${niFiProperties.getPropertyKeys()}")
+
+        // Act
+        def keyId = niFiProperties.getFlowFileRepoEncryptionKeyId()
+        def key = niFiProperties.getFlowFileRepoEncryptionKey()
+        def keys = niFiProperties.getFlowFileRepoEncryptionKeys()
+
+        logger.info("Retrieved key ID: ${keyId}")
+        logger.info("Retrieved key: ${key}")
+        logger.info("Retrieved keys: ${keys}")
+
+        // Assert
+        assert keyId == KEY_ID_2
+        assert key == KEY_HEX_2
+        assert keys == [(KEY_ID): KEY_HEX, (KEY_ID_2): KEY_HEX_2]
+    }
+
+    @Test
+    void testGetFlowFileRepoEncryptionKeysShouldWarnOnMisformattedProperties() throws Exception {
+        // Arrange
+        Properties rawProperties = new Properties()
+        final String KEY_ID = "K1"
+        final String KEY_HEX = "0123456789ABCDEFFEDCBA9876543210"
+        final String KEY_ID_2 = "K2"
+        final String KEY_HEX_2 = "AAAABBBBCCCCDDDDEEEEFFFF00001111"
+
+        /**
+         * This simulates after the initial key migration (K1 -> K2) when the admin has mistyped. The {@code nifi.properties} will look like:
+         *
+         * (note no nifi.flowfile.repository.encryption.key=)
+         * nifi.flowfile.repository.encryption.key.id=K2
+         * nifi.flowfile.repository.encryption.key.K1=0123456789ABCDEFFEDCBA9876543210
+         * nifi.flowfile.repository.encryption.key.K2=00000000000000000000000000000000
+         *
+         * The above properties should have ...key.id.K1= but they are missing the "id" segment
+         */
+
+        rawProperties.setProperty(FFREKID, KEY_ID_2)
+//        rawProperties.setProperty(FFREK, "")
+        rawProperties.setProperty("${FFREK}.${KEY_ID}", KEY_HEX)
+        rawProperties.setProperty("${FFREK}.${KEY_ID_2}", KEY_HEX_2)
+        NiFiProperties niFiProperties = new StandardNiFiProperties(rawProperties)
+        logger.info("niFiProperties has ${niFiProperties.size()} properties: ${niFiProperties.getPropertyKeys()}")
+
+        // Act
+        def keyId = niFiProperties.getFlowFileRepoEncryptionKeyId()
+        def key = niFiProperties.getFlowFileRepoEncryptionKey()
+        def keys = niFiProperties.getFlowFileRepoEncryptionKeys()
+
+        logger.info("Retrieved key ID: ${keyId}")
+        logger.info("Retrieved key: ${key}")
+        logger.info("Retrieved keys: ${keys}")
+
+        // Assert
+        assert keyId == KEY_ID_2
+        assert !key
+        assert keys == [:]
+    }
+
+    @Test
+    void testShouldGetFlowFileRepoEncryptionKeysFromMultiplePropertiesWithDuplicates() throws Exception {
+        // Arrange
+        Properties rawProperties = new Properties()
+        final String KEY_ID = "K1"
+        final String KEY_HEX = "0123456789ABCDEFFEDCBA9876543210"
+        final String KEY_ID_2 = "K2"
+        final String KEY_HEX_2 = "AAAABBBBCCCCDDDDEEEEFFFF00001111"
+        final String KEY_HEX_DUP = "AA" * 16
+
+        /**
+         * This simulates after the initial key migration (K1 -> K2) with a mistaken duplication. The
+         * {@code nifi.properties} will look like:
+         *
+         * nifi.flowfile.repository.encryption.key.id=K2
+         * nifi.flowfile.repository.encryption.key=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+         * nifi.flowfile.repository.encryption.key.id.K1=0123456789ABCDEFFEDCBA9876543210
+         * nifi.flowfile.repository.encryption.key.id.K2=00000000000000000000000000000000
+         *
+         * The value of K2 wil be AAAA..., overriding key.K2 as .key will always win
+         */
+
+        /**
+         * The properties loading code will print a warning if it detects duplicates, but will not stop execution
+         */
+        rawProperties.setProperty(FFREKID, KEY_ID_2)
+        rawProperties.setProperty(FFREK, KEY_HEX_DUP)
+        rawProperties.setProperty("${FFREK}.id.${KEY_ID}", KEY_HEX)
+        rawProperties.setProperty("${FFREK}.id.${KEY_ID_2}", KEY_HEX_2)
+        NiFiProperties niFiProperties = new StandardNiFiProperties(rawProperties)
+        logger.info("niFiProperties has ${niFiProperties.size()} properties: ${niFiProperties.getPropertyKeys()}")
+
+        // Act
+        def keyId = niFiProperties.getFlowFileRepoEncryptionKeyId()
+        def key = niFiProperties.getFlowFileRepoEncryptionKey()
+        def keys = niFiProperties.getFlowFileRepoEncryptionKeys()
+
+        logger.info("Retrieved key ID: ${keyId}")
+        logger.info("Retrieved key: ${key}")
+        logger.info("Retrieved keys: ${keys}")
+
+        // Assert
+        assert keyId == KEY_ID_2
+        assert key == KEY_HEX_2
+        assert keys == [(KEY_ID): KEY_HEX, (KEY_ID_2): KEY_HEX_DUP]
+    }
+
+    @Test
+    void testShouldGetFlowFileRepoEncryptionKeysFromMultiplePropertiesWithDuplicatesInReverseOrder() throws Exception {
+        // Arrange
+        Properties rawProperties = new Properties()
+        final String KEY_ID = "K1"
+        final String KEY_HEX = "0123456789ABCDEFFEDCBA9876543210"
+        final String KEY_ID_2 = "K2"
+        final String KEY_HEX_2 = "AAAABBBBCCCCDDDDEEEEFFFF00001111"
+        final String KEY_HEX_DUP = "AA" * 16
+
+        /**
+         * This simulates after the initial key migration (K1 -> K2) with a mistaken duplication. The
+         * {@code nifi.properties} will look like:
+         *
+         * nifi.flowfile.repository.encryption.key.id.K1=0123456789ABCDEFFEDCBA9876543210
+         * nifi.flowfile.repository.encryption.key.id.K2=00000000000000000000000000000000
+         * nifi.flowfile.repository.encryption.key=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+         * nifi.flowfile.repository.encryption.key.id=K2
+         *
+         * The value of K2 wil be AAAA..., overriding key.K2 as .key will always win
+         */
+
+        /**
+         * The properties loading code will print a warning if it detects duplicates, but will not stop execution
+         */
+        rawProperties.setProperty("${FFREK}.id.${KEY_ID_2}", KEY_HEX_2)
+        rawProperties.setProperty("${FFREK}.id.${KEY_ID}", KEY_HEX)
+        rawProperties.setProperty(FFREK, KEY_HEX_DUP)
+        rawProperties.setProperty(FFREKID, KEY_ID_2)
+        NiFiProperties niFiProperties = new StandardNiFiProperties(rawProperties)
+        logger.info("niFiProperties has ${niFiProperties.size()} properties: ${niFiProperties.getPropertyKeys()}")
+
+        // Act
+        def keyId = niFiProperties.getFlowFileRepoEncryptionKeyId()
+        def key = niFiProperties.getFlowFileRepoEncryptionKey()
+        def keys = niFiProperties.getFlowFileRepoEncryptionKeys()
+
+        logger.info("Retrieved key ID: ${keyId}")
+        logger.info("Retrieved key: ${key}")
+        logger.info("Retrieved keys: ${keys}")
+
+        // Assert
+        assert keyId == KEY_ID_2
+        assert key == KEY_HEX_2
+        assert keys == [(KEY_ID): KEY_HEX, (KEY_ID_2): KEY_HEX_DUP]
+    }
+
+    @Test
     void testShouldGetProvenanceRepoEncryptionKeysWithNoDefaultDefined() throws Exception {
         // Arrange
         Properties rawProperties = new Properties()
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 81e5343..0edcd23 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -59,6 +59,10 @@
         <nifi.flowfile.repository.partitions>256</nifi.flowfile.repository.partitions>
         <nifi.flowfile.repository.checkpoint.interval>2 mins</nifi.flowfile.repository.checkpoint.interval>
         <nifi.flowfile.repository.always.sync>false</nifi.flowfile.repository.always.sync>
+        <nifi.flowfile.repository.encryption.key.provider.implementation />
+        <nifi.flowfile.repository.encryption.key.provider.location />
+        <nifi.flowfile.repository.encryption.key.id />
+        <nifi.flowfile.repository.encryption.key />
         <nifi.swap.manager.implementation>org.apache.nifi.controller.FileSystemSwapManager</nifi.swap.manager.implementation>
         <nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold>
         <nifi.swap.in.period>5 sec</nifi.swap.in.period>
@@ -74,6 +78,10 @@
         <nifi.content.repository.archive.max.usage.percentage>50%</nifi.content.repository.archive.max.usage.percentage>
         <nifi.content.repository.archive.enabled>true</nifi.content.repository.archive.enabled>
         <nifi.content.repository.always.sync>false</nifi.content.repository.always.sync>
+        <nifi.content.repository.encryption.key.provider.implementation />
+        <nifi.content.repository.encryption.key.provider.location />
+        <nifi.content.repository.encryption.key.id />
+        <nifi.content.repository.encryption.key />
         <nifi.content.viewer.url>../nifi-content-viewer/</nifi.content.viewer.url>
 
         <nifi.restore.directory />
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 617e722..03518ee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -64,6 +64,10 @@ nifi.flowfile.repository.directory=${nifi.flowfile.repository.directory}
 nifi.flowfile.repository.partitions=${nifi.flowfile.repository.partitions}
 nifi.flowfile.repository.checkpoint.interval=${nifi.flowfile.repository.checkpoint.interval}
 nifi.flowfile.repository.always.sync=${nifi.flowfile.repository.always.sync}
+nifi.flowfile.repository.encryption.key.provider.implementation=${nifi.flowfile.repository.encryption.key.provider.implementation}
+nifi.flowfile.repository.encryption.key.provider.location=${nifi.flowfile.repository.encryption.key.provider.location}
+nifi.flowfile.repository.encryption.key.id=${nifi.flowfile.repository.encryption.key.id}
+nifi.flowfile.repository.encryption.key=${nifi.flowfile.repository.encryption.key}
 
 nifi.swap.manager.implementation=${nifi.swap.manager.implementation}
 nifi.queue.swap.threshold=${nifi.queue.swap.threshold}
@@ -82,6 +86,10 @@ nifi.content.repository.archive.max.usage.percentage=${nifi.content.repository.a
 nifi.content.repository.archive.enabled=${nifi.content.repository.archive.enabled}
 nifi.content.repository.always.sync=${nifi.content.repository.always.sync}
 nifi.content.viewer.url=${nifi.content.viewer.url}
+nifi.content.repository.encryption.key.provider.implementation=${nifi.content.repository.encryption.key.provider.implementation}
+nifi.content.repository.encryption.key.provider.location=${nifi.content.repository.encryption.key.provider.location}
+nifi.content.repository.encryption.key.id=${nifi.content.repository.encryption.key.id}
+nifi.content.repository.encryption.key=${nifi.content.repository.encryption.key}
 
 # Provenance Repository Properties
 nifi.provenance.repository.implementation=${nifi.provenance.repository.implementation}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index 1c8e33b..b04bca7 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -52,6 +52,7 @@ public class RepositoryConfiguration {
     private int maxAttributeChars = 65536;
     private int debugFrequency = 1_000_000;
 
+    // TODO: Delegaate to RepositoryEncryptionConfiguration in NIFI-6617
     private Map<String, String> encryptionKeys;
     private String keyId;
     private String keyProviderImplementation;
@@ -494,6 +495,7 @@ public class RepositoryConfiguration {
 
         config.setDebugFrequency(nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_REPO_DEBUG_FREQUENCY, config.getDebugFrequency()));
 
+        // TODO: Check for multiple key loading (NIFI-6617)
         // Encryption values may not be present but are only required for EncryptedWriteAheadProvenanceRepository
         final String implementationClassName = nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
         if (EncryptedWriteAheadProvenanceRepository.class.getName().equals(implementationClassName)) {
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index 8355426..475c750 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -38,7 +38,8 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
 import org.apache.nifi.provenance.toc.StandardTocReader;
 import org.apache.nifi.provenance.toc.TocReader;
 import org.apache.nifi.provenance.toc.TocUtil;
-import org.apache.nifi.security.kms.CryptoUtils;
+import org.apache.nifi.security.repository.RepositoryEncryptorUtils;
+import org.apache.nifi.security.repository.RepositoryType;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -195,7 +196,7 @@ public class RecordReaders {
         } else {
             try {
                 NiFiProperties niFiProperties = NiFiPropertiesLoader.loadDefaultWithKeyFromBootstrap();
-                isEncryptionAvailable = CryptoUtils.isProvenanceRepositoryEncryptionConfigured(niFiProperties);
+                isEncryptionAvailable = RepositoryEncryptorUtils.isRepositoryEncryptionConfigured(niFiProperties, RepositoryType.PROVENANCE);
                 encryptionPropertiesRead = true;
             } catch (IOException e) {
                 logger.error("Encountered an error checking the provenance repository encryption configuration: ", e);