You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/02 17:27:35 UTC

[10/13] nifi git commit: NIFI-3594 Implemented encrypted provenance repository. Added src/test/resources/logback-test.xml files resetting log level from DEBUG (in nifi-data-provenance-utils) to WARN because later tests depend on MockComponentLog recordin

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index 7a2f57e..5a75172 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.util.FormatUtils;
@@ -50,6 +49,12 @@ public class RepositoryConfiguration {
     private int journalCount = 16;
     private int compressionBlockBytes = 1024 * 1024;
     private int maxAttributeChars = 65536;
+    private int debugFrequency = 1_000_000;
+
+    private Map<String, String> encryptionKeys;
+    private String keyId;
+    private String keyProviderImplementation;
+    private String keyProviderLocation;
 
     private List<SearchableField> searchableFields = new ArrayList<>();
     private List<SearchableField> searchableAttributes = new ArrayList<>();
@@ -360,6 +365,54 @@ public class RepositoryConfiguration {
         return Optional.ofNullable(warmCacheFrequencyMinutes);
     }
 
+    public boolean supportsEncryption() {
+        boolean keyProviderIsConfigured = CryptoUtils.isValidKeyProvider(keyProviderImplementation, keyProviderLocation, keyId, encryptionKeys);
+
+        return keyProviderIsConfigured;
+    }
+
+    public Map<String, String> getEncryptionKeys() {
+        return encryptionKeys;
+    }
+
+    public void setEncryptionKeys(Map<String, String> encryptionKeys) {
+        this.encryptionKeys = encryptionKeys;
+    }
+
+    public String getKeyId() {
+        return keyId;
+    }
+
+    public void setKeyId(String keyId) {
+        this.keyId = keyId;
+    }
+
+    public String getKeyProviderImplementation() {
+        return keyProviderImplementation;
+    }
+
+    public void setKeyProviderImplementation(String keyProviderImplementation) {
+        this.keyProviderImplementation = keyProviderImplementation;
+    }
+
+    public String getKeyProviderLocation() {
+        return keyProviderLocation;
+    }
+
+    public void setKeyProviderLocation(String keyProviderLocation) {
+        this.keyProviderLocation = keyProviderLocation;
+    }
+
+
+    public int getDebugFrequency() {
+        return debugFrequency;
+    }
+
+    public void setDebugFrequency(int debugFrequency) {
+        this.debugFrequency = debugFrequency;
+    }
+
+
     public static RepositoryConfiguration create(final NiFiProperties nifiProperties) {
         final Map<String, Path> storageDirectories = nifiProperties.getProvenanceRepositoryPaths();
         if (storageDirectories.isEmpty()) {
@@ -436,6 +489,17 @@ public class RepositoryConfiguration {
 
         config.setAlwaysSync(alwaysSync);
 
+        config.setDebugFrequency(nifiProperties.getIntegerProperty(NiFiProperties.PROVENANCE_REPO_DEBUG_FREQUENCY, config.getDebugFrequency()));
+
+        // Encryption values may not be present but are only required for EncryptedWriteAheadProvenanceRepository
+        final String implementationClassName = nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
+        if (EncryptedWriteAheadProvenanceRepository.class.getName().equals(implementationClassName)) {
+            config.setEncryptionKeys(nifiProperties.getProvenanceRepoEncryptionKeys());
+            config.setKeyId(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID));
+            config.setKeyProviderImplementation(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS));
+            config.setKeyProviderLocation(nifiProperties.getProperty(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_LOCATION));
+        }
+
         return config;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
index 8975028..4782dbe 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.resource.Authorizable;
@@ -84,7 +83,7 @@ import org.slf4j.LoggerFactory;
  */
 public class WriteAheadProvenanceRepository implements ProvenanceRepository {
     private static final Logger logger = LoggerFactory.getLogger(WriteAheadProvenanceRepository.class);
-    private static final int BLOCK_SIZE = 1024 * 32;
+    static final int BLOCK_SIZE = 1024 * 32;
     public static final String EVENT_CATEGORY = "Provenance Repository";
 
     private final RepositoryConfiguration config;
@@ -129,6 +128,14 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
             }
         };
 
+       init(recordWriterFactory, recordReaderFactory, eventReporter, authorizer, resourceFactory);
+    }
+
+    synchronized void init(RecordWriterFactory recordWriterFactory, RecordReaderFactory recordReaderFactory,
+                           final EventReporter eventReporter, final Authorizer authorizer,
+                           final ProvenanceAuthorizableFactory resourceFactory) throws IOException {
+        final EventFileManager fileManager = new EventFileManager();
+
         eventStore = new PartitionedWriteAheadEventStore(config, recordWriterFactory, recordReaderFactory, eventReporter, fileManager);
 
         final IndexManager indexManager = new SimpleIndexManager(config);
@@ -145,7 +152,7 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
             eventStore.reindexLatestEvents(eventIndex);
         } catch (final Exception e) {
             logger.error("Failed to re-index some of the Provenance Events. It is possible that some of the latest "
-                + "events will not be available from the Provenance Repository when a query is issued.", e);
+                    + "events will not be available from the Provenance Repository when a query is issued.", e);
         }
     }
 
@@ -282,4 +289,8 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
     public List<SearchableField> getSearchableAttributes() {
         return Collections.unmodifiableList(config.getSearchableAttributes());
     }
+
+    RepositoryConfiguration getConfig() {
+        return this.config;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
index a583403..f4b47d3 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
@@ -36,7 +36,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.Term;
@@ -246,7 +245,7 @@ public class LuceneEventIndex implements EventIndex {
 
             final Document document = eventConverter.convert(event, summary);
             if (document == null) {
-                logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event);
+                logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId());
             } else {
                 final File indexDir;
                 if (event.getEventTime() == lastEventTime) {
@@ -291,7 +290,7 @@ public class LuceneEventIndex implements EventIndex {
 
         final Document document = eventConverter.convert(event, location);
         if (document == null) {
-            logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event);
+            logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId());
         } else {
             final StoredDocument doc = new StoredDocument(document, location);
             boolean added = false;
@@ -357,13 +356,13 @@ public class LuceneEventIndex implements EventIndex {
             eventOption = eventStore.getEvent(eventId);
         } catch (final Exception e) {
             logger.error("Failed to retrieve Provenance Event with ID " + eventId + " to calculate data lineage due to: " + e, e);
-            final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity());
+            final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.emptySet(), 1, user.getIdentity());
             result.getResult().setError("Failed to retrieve Provenance Event with ID " + eventId + ". See logs for more information.");
             return result;
         }
 
         if (!eventOption.isPresent()) {
-            final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String> emptySet(), 1, user.getIdentity());
+            final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.emptySet(), 1, user.getIdentity());
             result.getResult().setError("Could not find Provenance Event with ID " + eventId);
             lineageSubmissionMap.put(result.getLineageIdentifier(), result);
             return result;
@@ -524,7 +523,7 @@ public class LuceneEventIndex implements EventIndex {
                 }
                 default: {
                     final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN,
-                        eventId, Collections.<String> emptyList(), 1, userId);
+                        eventId, Collections.emptyList(), 1, userId);
 
                     lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
                     submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
@@ -533,7 +532,7 @@ public class LuceneEventIndex implements EventIndex {
             }
         } catch (final Exception e) {
             final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN,
-                eventId, Collections.<String> emptyList(), 1, userId);
+                eventId, Collections.emptyList(), 1, userId);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
             submission.getResult().setError("Failed to expand children for lineage of event with ID " + eventId + " due to: " + e);
             return submission;
@@ -564,7 +563,7 @@ public class LuceneEventIndex implements EventIndex {
                 }
                 default: {
                     final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS,
-                        eventId, Collections.<String> emptyList(), 1, userId);
+                        eventId, Collections.emptyList(), 1, userId);
 
                     lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
                     submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
@@ -573,7 +572,7 @@ public class LuceneEventIndex implements EventIndex {
             }
         } catch (final Exception e) {
             final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS,
-                eventId, Collections.<String> emptyList(), 1, userId);
+                eventId, Collections.emptyList(), 1, userId);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
 
             submission.getResult().setError("Failed to expand parents for lineage of event with ID " + eventId + " due to: " + e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
index d6f50dd..2bc7fbe 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java
@@ -56,4 +56,12 @@ public class EventFieldNames {
     public static final String EXPLICIT_VALUE = "Explicit Value";
     public static final String LOOKUP_VALUE = "Lookup Value";
     public static final String UNCHANGED_VALUE = "Unchanged";
+
+    // For encrypted records
+    public static final String IS_ENCRYPTED = "Encrypted Record";
+    public static final String KEY_ID = "Encryption Key ID";
+    public static final String VERSION = "Encryption Version";
+    public static final String ALGORITHM = "Encryption Algorithm";
+    public static final String IV = "Initialization Vector";
+    public static final String ENCRYPTION_DETAILS = "Encryption Details";
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
index 7b33ded..0577636 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java
@@ -66,12 +66,12 @@ public class LookupTableEventRecordFields {
     public static final RecordField CONTENT_CLAIM_SIZE = new SimpleRecordField(EventFieldNames.CONTENT_CLAIM_SIZE, FieldType.LONG, EXACTLY_ONE);
 
     public static final RecordField PREVIOUS_CONTENT_CLAIM = new ComplexRecordField(EventFieldNames.PREVIOUS_CONTENT_CLAIM, ZERO_OR_ONE,
-        CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
+            CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
 
     public static final RecordField CURRENT_CONTENT_CLAIM_EXPLICIT = new ComplexRecordField(EventFieldNames.EXPLICIT_VALUE, EXACTLY_ONE,
-        CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
+            CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
     public static final RecordField CURRENT_CONTENT_CLAIM = new UnionRecordField(EventFieldNames.CONTENT_CLAIM,
-        Repetition.EXACTLY_ONE, NO_VALUE, UNCHANGED_VALUE, CURRENT_CONTENT_CLAIM_EXPLICIT);
+            Repetition.EXACTLY_ONE, NO_VALUE, UNCHANGED_VALUE, CURRENT_CONTENT_CLAIM_EXPLICIT);
 
 
     // EventType-Specific fields

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
index 7110336..d596c8e 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java
@@ -45,7 +45,6 @@ import static org.apache.nifi.provenance.schema.LookupTableEventRecordFields.UPD
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-
 import org.apache.nifi.repository.schema.RecordField;
 import org.apache.nifi.repository.schema.RecordSchema;
 
@@ -90,5 +89,4 @@ public class LookupTableEventSchema {
         final RecordSchema schema = new RecordSchema(fields);
         return schema;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
index 93c0669..dfbcd2b 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
@@ -25,7 +25,6 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 import java.util.zip.GZIPInputStream;
-
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.toc.TocReader;
@@ -333,7 +332,7 @@ public abstract class CompressableRecordReader implements RecordReader {
         try {
             boolean read = true;
             while (read) {
-                final Optional<StandardProvenanceEventRecord> eventOptional = readToEvent(eventId, dis, serializationVersion);
+                final Optional<StandardProvenanceEventRecord> eventOptional = this.readToEvent(eventId, dis, serializationVersion);
                 if (eventOptional.isPresent()) {
                     pushbackEvent = eventOptional.get();
                     return Optional.of(pushbackEvent);

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index 8e79ddd..7ae4adc 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -27,9 +27,11 @@ import java.io.InputStream;
 import java.nio.file.Path;
 import java.util.Collection;
 import java.util.zip.GZIPInputStream;
-
+import org.apache.nifi.properties.NiFiPropertiesLoader;
 import org.apache.nifi.provenance.ByteArraySchemaRecordReader;
 import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
+import org.apache.nifi.provenance.CryptoUtils;
+import org.apache.nifi.provenance.EncryptedSchemaRecordReader;
 import org.apache.nifi.provenance.EventIdFirstSchemaRecordReader;
 import org.apache.nifi.provenance.EventIdFirstSchemaRecordWriter;
 import org.apache.nifi.provenance.StandardRecordReader;
@@ -37,17 +39,25 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
 import org.apache.nifi.provenance.toc.StandardTocReader;
 import org.apache.nifi.provenance.toc.TocReader;
 import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RecordReaders {
 
+    private static Logger logger = LoggerFactory.getLogger(RecordReaders.class);
+
+    private static boolean isEncryptionAvailable = false;
+    private static boolean encryptionPropertiesRead = false;
+
     /**
      * Creates a new Record Reader that is capable of reading Provenance Event Journals
      *
-     * @param file the Provenance Event Journal to read data from
+     * @param file               the Provenance Event Journal to read data from
      * @param provenanceLogFiles collection of all provenance journal files
-     * @param maxAttributeChars the maximum number of characters to retrieve for any one attribute. This allows us to avoid
-     *            issues where a FlowFile has an extremely large attribute and reading events
-     *            for that FlowFile results in loading that attribute into memory many times, exhausting the Java Heap
+     * @param maxAttributeChars  the maximum number of characters to retrieve for any one attribute. This allows us to avoid
+     *                           issues where a FlowFile has an extremely large attribute and reading events
+     *                           for that FlowFile results in loading that attribute into memory many times, exhausting the Java Heap
      * @return a Record Reader capable of reading Provenance Event Journals
      * @throws IOException if unable to create a Record Reader for the given file
      */
@@ -68,7 +78,7 @@ public class RecordReaders {
                 }
             }
 
-            if ( file.exists() ) {
+            if (file.exists()) {
                 try {
                     fis = new FileInputStream(file);
                 } catch (final FileNotFoundException fnfe) {
@@ -77,7 +87,8 @@ public class RecordReaders {
             }
 
             String filename = file.getName();
-            openStream: while ( fis == null ) {
+            openStream:
+            while (fis == null) {
                 final File dir = file.getParentFile();
                 final String baseName = LuceneUtil.substringBefore(file.getName(), ".prov");
 
@@ -85,9 +96,9 @@ public class RecordReaders {
                 // filename that we need. The majority of the time, we will use the extension ".prov.gz"
                 // because most often we are compressing on rollover and most often we have already finished
                 // compressing by the time that we are querying the data.
-                for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
+                for (final String extension : new String[]{".prov.gz", ".prov"}) {
                     file = new File(dir, baseName + extension);
-                    if ( file.exists() ) {
+                    if (file.exists()) {
                         try {
                             fis = new FileInputStream(file);
                             filename = baseName + extension;
@@ -104,7 +115,7 @@ public class RecordReaders {
                 break;
             }
 
-            if ( fis == null ) {
+            if (fis == null) {
                 throw new FileNotFoundException("Unable to locate file " + originalFile);
             }
 
@@ -148,12 +159,25 @@ public class RecordReaders {
                     final TocReader tocReader = new StandardTocReader(tocFile);
                     return new EventIdFirstSchemaRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars);
                 }
+                case EncryptedSchemaRecordReader.SERIALIZATION_NAME: {
+                    if (!tocFile.exists()) {
+                        throw new FileNotFoundException("Cannot create TOC Reader because the file " + tocFile + " does not exist");
+                    }
+
+                    if (!isEncryptionAvailable()) {
+                        throw new IOException("Cannot read encrypted repository because this reader is not configured for encryption");
+                    }
+
+                    final TocReader tocReader = new StandardTocReader(tocFile);
+                    // Return a reader with no eventEncryptor because this method contract cannot change, then inject the encryptor from the writer in the calling method
+                    return new EncryptedSchemaRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars, null);
+                }
                 default: {
                     throw new IOException("Unable to read data from file " + file + " because the file was written using an unknown Serializer: " + serializationName);
                 }
             }
         } catch (final IOException ioe) {
-            if ( fis != null ) {
+            if (fis != null) {
                 try {
                     fis.close();
                 } catch (final IOException inner) {
@@ -165,4 +189,20 @@ public class RecordReaders {
         }
     }
 
+    private static boolean isEncryptionAvailable() {
+        if (encryptionPropertiesRead) {
+            return isEncryptionAvailable;
+        } else {
+            try {
+                NiFiProperties niFiProperties = NiFiPropertiesLoader.loadDefaultWithKeyFromBootstrap();
+                isEncryptionAvailable = CryptoUtils.isProvenanceRepositoryEncryptionConfigured(niFiProperties);
+                encryptionPropertiesRead = true;
+            } catch (IOException e) {
+                logger.error("Encountered an error checking the provenance repository encryption configuration: ", e);
+                isEncryptionAvailable = false;
+            }
+            return isEncryptionAvailable;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java
index 41d5ade..766278a 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java
@@ -19,7 +19,6 @@ package org.apache.nifi.provenance.util;
 
 import java.util.List;
 import java.util.Map;
-
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.serialization.StorageSummary;
@@ -182,4 +181,14 @@ public class StorageSummaryEvent implements ProvenanceEventRecord {
     public Long getPreviousContentClaimOffset() {
         return event.getPreviousContentClaimOffset();
     }
+
+    /**
+     * Returns the best event identifier for this event (eventId if available, descriptive identifier if not yet persisted to allow for traceability).
+     *
+     * @return a descriptive event ID to allow tracing
+     */
+    @Override
+    public String getBestEventIdentifier() {
+        return Long.toString(getEventId());
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository
index 6a353d2..94cc70c 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.provenance.PersistentProvenanceRepository
-org.apache.nifi.provenance.WriteAheadProvenanceRepository
\ No newline at end of file
+org.apache.nifi.provenance.WriteAheadProvenanceRepository
+org.apache.nifi.provenance.EncryptedWriteAheadProvenanceRepository
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
new file mode 100644
index 0000000..ec8c225
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedSchemaRecordReaderWriterTest.groovy
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance
+
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.serialization.RecordReader
+import org.apache.nifi.provenance.serialization.RecordWriter
+import org.apache.nifi.provenance.toc.StandardTocReader
+import org.apache.nifi.provenance.toc.StandardTocWriter
+import org.apache.nifi.provenance.toc.TocReader
+import org.apache.nifi.provenance.toc.TocUtil
+import org.apache.nifi.provenance.toc.TocWriter
+import org.apache.nifi.util.file.FileUtils
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.bouncycastle.util.encoders.Hex
+import org.junit.After
+import org.junit.AfterClass
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.ClassRule
+import org.junit.Test
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import javax.crypto.Cipher
+import javax.crypto.spec.SecretKeySpec
+import java.security.KeyManagementException
+import java.security.Security
+import java.util.concurrent.atomic.AtomicLong
+
+import static groovy.test.GroovyAssert.shouldFail
+import static org.apache.nifi.provenance.TestUtil.createFlowFile
+
+@RunWith(JUnit4.class)
+class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWriter {
+    private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordReaderWriterTest.class)
+
+    private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
+    private static final String KEY_HEX_256 = KEY_HEX_128 * 2
+    private static final String KEY_HEX = isUnlimitedStrengthCryptoAvailable() ? KEY_HEX_256 : KEY_HEX_128
+    private static final String KEY_ID = "K1"
+
+    private static final String TRANSIT_URI = "nifi://unit-test"
+    private static final String PROCESSOR_TYPE = "Mock Processor"
+    private static final String COMPONENT_ID = "1234"
+
+    private static final int UNCOMPRESSED_BLOCK_SIZE = 1024 * 32
+    private static final int MAX_ATTRIBUTE_SIZE = 2048
+
+    private static final AtomicLong idGenerator = new AtomicLong(0L)
+    private File journalFile
+    private File tocFile
+
+    private static KeyProvider mockKeyProvider
+    private static ProvenanceEventEncryptor provenanceEventEncryptor = new AESProvenanceEventEncryptor()
+
+    @ClassRule
+    public static TemporaryFolder tempFolder = new TemporaryFolder()
+
+    private static String ORIGINAL_LOG_LEVEL
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        ORIGINAL_LOG_LEVEL = System.getProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance")
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG")
+
+        Security.addProvider(new BouncyCastleProvider())
+
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+
+        mockKeyProvider = [
+                getKey            : { String keyId ->
+                    logger.mock("Requesting key ID: ${keyId}")
+                    if (keyId == KEY_ID) {
+                        new SecretKeySpec(Hex.decode(KEY_HEX), "AES")
+                    } else {
+                        throw new KeyManagementException("${keyId} is not available")
+                    }
+                },
+                getAvailableKeyIds: { ->
+                    logger.mock("Available key IDs: [${KEY_ID}]")
+                    [KEY_ID]
+                },
+                keyExists         : { String keyId ->
+                    logger.mock("Checking availability of key ID: ${keyId}")
+                    keyId == KEY_ID
+                }] as KeyProvider
+        provenanceEventEncryptor.initialize(mockKeyProvider)
+    }
+
+    @Before
+    void setUp() throws Exception {
+        journalFile = new File("target/storage/${UUID.randomUUID()}/testEventIdFirstSchemaRecordReaderWriter")
+        tocFile = TocUtil.getTocFile(journalFile)
+        idGenerator.set(0L)
+    }
+
+    @After
+    void tearDown() throws Exception {
+        try {
+            FileUtils.deleteFile(journalFile.getParentFile(), true)
+        } catch (Exception e) {
+            logger.error(e.getMessage())
+        }
+    }
+
+    @AfterClass
+    static void tearDownOnce() throws Exception {
+        if (ORIGINAL_LOG_LEVEL) {
+            System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", ORIGINAL_LOG_LEVEL)
+        }
+        try {
+            FileUtils.deleteFile(new File("target/storage"), true)
+        } catch (Exception e) {
+            logger.error(e)
+        }
+    }
+
+    private static boolean isUnlimitedStrengthCryptoAvailable() {
+        Cipher.getMaxAllowedKeyLength("AES") > 128
+    }
+
+    private static
+    final FlowFile buildFlowFile(Map attributes = [:], long id = idGenerator.getAndIncrement(), long fileSize = 3000L) {
+        if (!attributes?.uuid) {
+            attributes.uuid = UUID.randomUUID().toString()
+        }
+        createFlowFile(id, fileSize, attributes)
+    }
+
+    private
+    static ProvenanceEventRecord buildEventRecord(FlowFile flowfile = buildFlowFile(), ProvenanceEventType eventType = ProvenanceEventType.RECEIVE, String transitUri = TRANSIT_URI, String componentId = COMPONENT_ID, String componentType = PROCESSOR_TYPE, long eventTime = System.currentTimeMillis()) {
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder()
+        builder.setEventTime(eventTime)
+        builder.setEventType(eventType)
+        builder.setTransitUri(transitUri)
+        builder.fromFlowFile(flowfile)
+        builder.setComponentId(componentId)
+        builder.setComponentType(componentType)
+        builder.build()
+    }
+
+    @Override
+    protected RecordWriter createWriter(
+            final File file,
+            final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+        createWriter(file, tocWriter, compressed, uncompressedBlockSize, provenanceEventEncryptor)
+    }
+
+    protected static RecordWriter createWriter(
+            final File file,
+            final TocWriter tocWriter,
+            final boolean compressed,
+            final int uncompressedBlockSize, ProvenanceEventEncryptor encryptor) throws IOException {
+        return new EncryptedSchemaRecordWriter(file, idGenerator, tocWriter, compressed, uncompressedBlockSize, IdentifierLookup.EMPTY, encryptor, 1)
+    }
+
+    @Override
+    protected RecordReader createReader(
+            final InputStream inputStream,
+            final String journalFilename, final TocReader tocReader, final int maxAttributeSize) throws IOException {
+        return new EncryptedSchemaRecordReader(inputStream, journalFilename, tocReader, maxAttributeSize, provenanceEventEncryptor)
+    }
+
+    /**
+     * Build a record and write it to the repository with the encrypted writer. Recover with the encrypted reader and verify.
+     */
+    @Test
+    void testShouldWriteAndReadEncryptedRecord() {
+        // Arrange
+        final ProvenanceEventRecord record = buildEventRecord()
+        logger.info("Built sample PER: ${record}")
+
+        TocWriter tocWriter = new StandardTocWriter(tocFile, false, false)
+
+        RecordWriter encryptedWriter = createWriter(journalFile, tocWriter, false, UNCOMPRESSED_BLOCK_SIZE)
+        logger.info("Generated encrypted writer: ${encryptedWriter}")
+
+        // Act
+        int encryptedRecordId = idGenerator.get()
+        encryptedWriter.writeHeader(encryptedRecordId)
+        encryptedWriter.writeRecord(record)
+        encryptedWriter.close()
+        logger.info("Wrote encrypted record ${encryptedRecordId} to journal")
+
+        // Assert
+        TocReader tocReader = new StandardTocReader(tocFile)
+        final FileInputStream fis = new FileInputStream(journalFile)
+        final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, MAX_ATTRIBUTE_SIZE)
+        logger.info("Generated encrypted reader: ${reader}")
+
+        ProvenanceEventRecord encryptedEvent = reader.nextRecord()
+        assert encryptedEvent
+        assert encryptedRecordId as long == encryptedEvent.getEventId()
+        assert record.componentId == encryptedEvent.getComponentId()
+        assert record.componentType == encryptedEvent.getComponentType()
+        logger.info("Successfully read encrypted record: ${encryptedEvent}")
+
+        assert !reader.nextRecord()
+    }
+
+    /**
+     * Build a record and write it with a standard writer and the encrypted writer to different repositories. Recover with the standard reader and the contents of the encrypted record should be unreadable.
+     */
+    @Test
+    void testShouldWriteEncryptedRecordAndPlainRecord() {
+        // Arrange
+        final ProvenanceEventRecord record = buildEventRecord()
+        logger.info("Built sample PER: ${record}")
+
+        TocWriter tocWriter = new StandardTocWriter(tocFile, false, false)
+
+        RecordWriter standardWriter = new EventIdFirstSchemaRecordWriter(journalFile, idGenerator, tocWriter, false, UNCOMPRESSED_BLOCK_SIZE, IdentifierLookup.EMPTY)
+        logger.info("Generated standard writer: ${standardWriter}")
+
+        File encryptedJournalFile = new File(journalFile.absolutePath + "_encrypted")
+        File encryptedTocFile = TocUtil.getTocFile(encryptedJournalFile)
+        TocWriter encryptedTocWriter = new StandardTocWriter(encryptedTocFile, false, false)
+        RecordWriter encryptedWriter = createWriter(encryptedJournalFile, encryptedTocWriter, false, UNCOMPRESSED_BLOCK_SIZE)
+        logger.info("Generated encrypted writer: ${encryptedWriter}")
+
+        // Act
+        int standardRecordId = idGenerator.get()
+        standardWriter.writeHeader(standardRecordId)
+        standardWriter.writeRecord(record)
+        standardWriter.close()
+        logger.info("Wrote standard record ${standardRecordId} to journal")
+
+        int encryptedRecordId = idGenerator.get()
+        encryptedWriter.writeHeader(encryptedRecordId)
+        encryptedWriter.writeRecord(record)
+        encryptedWriter.close()
+        logger.info("Wrote encrypted record ${encryptedRecordId} to journal")
+
+        // Assert
+        TocReader tocReader = new StandardTocReader(tocFile)
+        final FileInputStream fis = new FileInputStream(journalFile)
+        final RecordReader reader = new EventIdFirstSchemaRecordReader(fis, journalFile.getName(), tocReader, MAX_ATTRIBUTE_SIZE)
+        logger.info("Generated standard reader: ${reader}")
+
+        ProvenanceEventRecord standardEvent = reader.nextRecord()
+        assert standardEvent
+        assert standardRecordId as long == standardEvent.getEventId()
+        assert record.componentId == standardEvent.getComponentId()
+        assert record.componentType == standardEvent.getComponentType()
+        logger.info("Successfully read standard record: ${standardEvent}")
+
+        assert !reader.nextRecord()
+
+        // Demonstrate unable to read from encrypted file with standard reader
+        TocReader incompatibleTocReader = new StandardTocReader(encryptedTocFile)
+        final FileInputStream efis = new FileInputStream(encryptedJournalFile)
+        RecordReader incompatibleReader = new EventIdFirstSchemaRecordReader(efis, encryptedJournalFile.getName(), incompatibleTocReader, MAX_ATTRIBUTE_SIZE)
+        logger.info("Generated standard reader (attempting to read encrypted file): ${incompatibleReader}")
+
+        def msg = shouldFail(EOFException) {
+            ProvenanceEventRecord encryptedEvent = incompatibleReader.nextRecord()
+        }
+        logger.expected(msg)
+        assert msg =~ "EOFException: Failed to read field"
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy
new file mode 100644
index 0000000..42cc881
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/groovy/org/apache/nifi/provenance/EncryptedWriteAheadProvenanceRepositoryTest.groovy
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance
+
+import org.apache.nifi.events.EventReporter
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.provenance.serialization.RecordReaders
+import org.apache.nifi.reporting.Severity
+import org.apache.nifi.util.file.FileUtils
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.junit.After
+import org.junit.AfterClass
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.ClassRule
+import org.junit.Test
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import javax.crypto.Cipher
+import java.security.Security
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import static org.apache.nifi.provenance.TestUtil.createFlowFile
+
+@RunWith(JUnit4.class)
+class EncryptedWriteAheadProvenanceRepositoryTest {
+    private static final Logger logger = LoggerFactory.getLogger(EncryptedWriteAheadProvenanceRepositoryTest.class)
+
+    private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
+    private static final String KEY_HEX_256 = KEY_HEX_128 * 2
+    private static final String KEY_HEX = isUnlimitedStrengthCryptoAvailable() ? KEY_HEX_256 : KEY_HEX_128
+    private static final String KEY_ID = "K1"
+
+    private static final String TRANSIT_URI = "nifi://unit-test"
+    private static final String PROCESSOR_TYPE = "Mock Processor"
+    private static final String COMPONENT_ID = "1234"
+
+    private static final AtomicLong recordId = new AtomicLong()
+
+    @ClassRule
+    public static TemporaryFolder tempFolder = new TemporaryFolder()
+
+    private ProvenanceRepository repo
+    private static RepositoryConfiguration config
+
+    public static final int DEFAULT_ROLLOVER_MILLIS = 2000
+    private EventReporter eventReporter
+    private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>())
+
+    private static String ORIGINAL_LOG_LEVEL
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        ORIGINAL_LOG_LEVEL = System.getProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance")
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG")
+
+        Security.addProvider(new BouncyCastleProvider())
+
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+    }
+
+    @Before
+    void setUp() throws Exception {
+        reportedEvents?.clear()
+        eventReporter = createMockEventReporter()
+    }
+
+    @After
+    void tearDown() throws Exception {
+        closeRepo(repo, config)
+
+        // Reset the boolean determiner
+        RecordReaders.encryptionPropertiesRead = false
+        RecordReaders.isEncryptionAvailable = false
+    }
+
+    @AfterClass
+    static void tearDownOnce() throws Exception {
+        if (ORIGINAL_LOG_LEVEL) {
+            System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", ORIGINAL_LOG_LEVEL)
+        }
+    }
+
+    private static boolean isUnlimitedStrengthCryptoAvailable() {
+        Cipher.getMaxAllowedKeyLength("AES") > 128
+    }
+
+    private static RepositoryConfiguration createConfiguration() {
+        RepositoryConfiguration config = new RepositoryConfiguration()
+        config.addStorageDirectory("1", new File("target/storage/" + UUID.randomUUID().toString()))
+        config.setCompressOnRollover(true)
+        config.setMaxEventFileLife(2000L, TimeUnit.SECONDS)
+        config.setCompressionBlockBytes(100)
+        return config
+    }
+
+    private static RepositoryConfiguration createEncryptedConfiguration() {
+        RepositoryConfiguration config = createConfiguration()
+        config.setEncryptionKeys([(KEY_ID): KEY_HEX])
+        config.setKeyId(KEY_ID)
+        config.setKeyProviderImplementation(StaticKeyProvider.class.name)
+        config
+    }
+
+    private EventReporter createMockEventReporter() {
+        [reportEvent: { Severity s, String c, String m ->
+            ReportedEvent event = new ReportedEvent(s, c, m)
+            reportedEvents.add(event)
+            logger.mock("Added ${event}")
+        }] as EventReporter
+    }
+
+    private void closeRepo(ProvenanceRepository repo = this.repo, RepositoryConfiguration config = this.config) throws IOException {
+        if (repo == null) {
+            return
+        }
+
+        try {
+            repo.close()
+        } catch (IOException ioe) {
+        }
+
+        // Delete all of the storage files. We do this in order to clean up the tons of files that
+        // we create but also to ensure that we have closed all of the file handles. If we leave any
+        // streams open, for instance, this will throw an IOException, causing our unit test to fail.
+        if (config != null) {
+            for (final File storageDir : config.getStorageDirectories().values()) {
+                int i
+                for (i = 0; i < 3; i++) {
+                    try {
+                        FileUtils.deleteFile(storageDir, true)
+                        break
+                    } catch (IOException ioe) {
+                        // if there is a virus scanner, etc. running in the background we may not be able to
+                        // delete the file. Wait a sec and try again.
+                        if (i == 2) {
+                            throw ioe
+                        } else {
+                            try {
+                                System.out.println("file: " + storageDir.toString() + " exists=" + storageDir.exists())
+                                FileUtils.deleteFile(storageDir, true)
+                                break
+                            } catch (final IOException ioe2) {
+                                // if there is a virus scanner, etc. running in the background we may not be able to
+                                // delete the file. Wait a sec and try again.
+                                if (i == 2) {
+                                    throw ioe2
+                                } else {
+                                    try {
+                                        Thread.sleep(1000L)
+                                    } catch (final InterruptedException ie) {
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private static
+    final FlowFile buildFlowFile(Map attributes = [:], long id = recordId.getAndIncrement(), long fileSize = 3000L) {
+        if (!attributes?.uuid) {
+            attributes.uuid = UUID.randomUUID().toString()
+        }
+        createFlowFile(id, fileSize, attributes)
+    }
+
+    private
+    static ProvenanceEventRecord buildEventRecord(FlowFile flowfile = buildFlowFile(), ProvenanceEventType eventType = ProvenanceEventType.RECEIVE, String transitUri = TRANSIT_URI, String componentId = COMPONENT_ID, String componentType = PROCESSOR_TYPE, long eventTime = System.currentTimeMillis()) {
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder()
+        builder.setEventTime(eventTime)
+        builder.setEventType(eventType)
+        builder.setTransitUri(transitUri)
+        builder.fromFlowFile(flowfile)
+        builder.setComponentId(componentId)
+        builder.setComponentType(componentType)
+        builder.build()
+    }
+
+    /**
+     * This test operates on {@link PersistentProvenanceRepository} to verify the normal operations of existing implementations.
+     *
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Test
+    void testPersistentProvenanceRepositoryShouldRegisterAndRetrieveEvents() throws IOException, InterruptedException {
+        // Arrange
+        config = createConfiguration()
+        config.setMaxEventFileCapacity(1L)
+        config.setMaxEventFileLife(1, TimeUnit.SECONDS)
+        repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS)
+        repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
+
+        Map attributes = ["abc": "xyz",
+                          "123": "456"]
+        final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
+
+        final int RECORD_COUNT = 10
+
+        // Act
+        RECORD_COUNT.times {
+            repo.registerEvent(record)
+        }
+
+        // Sleep to let the journal merge occur
+        Thread.sleep(1000L)
+
+        final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1)
+
+        logger.info("Recovered ${recoveredRecords.size()} events: ")
+        recoveredRecords.each { logger.info("\t${it}") }
+
+        // Assert
+        assert recoveredRecords.size() == RECORD_COUNT
+        recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i ->
+            assert recovered.getEventId() == (i as Long)
+            assert recovered.getTransitUri() == TRANSIT_URI
+            assert recovered.getEventType() == ProvenanceEventType.RECEIVE
+            // The UUID was added later but we care that all attributes we provided are still there
+            assert recovered.getAttributes().entrySet().containsAll(attributes.entrySet())
+        }
+    }
+
+    /**
+     * This test operates on {@link WriteAheadProvenanceRepository} to verify the normal operations of existing implementations.
+     *
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Test
+    void testWriteAheadProvenanceRepositoryShouldRegisterAndRetrieveEvents() throws IOException, InterruptedException {
+        // Arrange
+        config = createConfiguration()
+        // Needed until NIFI-3605 is implemented
+//        config.setMaxEventFileCapacity(1L)
+        config.setMaxEventFileCount(1)
+        config.setMaxEventFileLife(1, TimeUnit.SECONDS)
+        repo = new WriteAheadProvenanceRepository(config)
+        repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
+
+        Map attributes = ["abc": "xyz",
+                          "123": "456"]
+        final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
+
+        final int RECORD_COUNT = 10
+
+        // Act
+        RECORD_COUNT.times {
+            repo.registerEvent(record)
+        }
+
+        final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1)
+
+        logger.info("Recovered ${recoveredRecords.size()} events: ")
+        recoveredRecords.each { logger.info("\t${it}") }
+
+        // Assert
+        assert recoveredRecords.size() == RECORD_COUNT
+        recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i ->
+            assert recovered.getEventId() == (i as Long)
+            assert recovered.getTransitUri() == TRANSIT_URI
+            assert recovered.getEventType() == ProvenanceEventType.RECEIVE
+            // The UUID was added later but we care that all attributes we provided are still there
+            assert recovered.getAttributes().entrySet().containsAll(attributes.entrySet())
+        }
+    }
+
+    @Test
+    void testShouldRegisterAndGetEvent() {
+        // Arrange
+
+        // Override the boolean determiner
+        RecordReaders.encryptionPropertiesRead = true
+        RecordReaders.isEncryptionAvailable = true
+
+        config = createEncryptedConfiguration()
+        // Needed until NIFI-3605 is implemented
+//        config.setMaxEventFileCapacity(1L)
+        config.setMaxEventFileCount(1)
+        config.setMaxEventFileLife(1, TimeUnit.SECONDS)
+        repo = new EncryptedWriteAheadProvenanceRepository(config)
+        repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
+
+        Map attributes = ["abc": "This is a plaintext attribute.",
+                          "123": "This is another plaintext attribute."]
+        final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
+
+        final long LAST_RECORD_ID = repo.getMaxEventId()
+
+        // Act
+        repo.registerEvent(record)
+
+        // Retrieve the event through the interface
+        ProvenanceEventRecord recoveredRecord = repo.getEvent(LAST_RECORD_ID + 1)
+        logger.info("Recovered ${recoveredRecord}")
+
+        // Assert
+        assert recoveredRecord.getEventId() == LAST_RECORD_ID + 1
+        assert recoveredRecord.getTransitUri() == TRANSIT_URI
+        assert recoveredRecord.getEventType() == ProvenanceEventType.RECEIVE
+        // The UUID was added later but we care that all attributes we provided are still there
+        assert recoveredRecord.getAttributes().entrySet().containsAll(attributes.entrySet())
+    }
+
+    @Test
+    void testShouldRegisterAndGetEvents() {
+        // Arrange
+        final int RECORD_COUNT = 10
+
+        // Override the boolean determiner
+        RecordReaders.encryptionPropertiesRead = true
+        RecordReaders.isEncryptionAvailable = true
+
+        config = createEncryptedConfiguration()
+        // Needed until NIFI-3605 is implemented
+//        config.setMaxEventFileCapacity(1L)
+        config.setMaxEventFileCount(1)
+        config.setMaxEventFileLife(1, TimeUnit.SECONDS)
+        repo = new EncryptedWriteAheadProvenanceRepository(config)
+        repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
+
+        Map attributes = ["abc": "This is a plaintext attribute.",
+                          "123": "This is another plaintext attribute."]
+        final List<ProvenanceEventRecord> records = []
+        RECORD_COUNT.times { int i ->
+            records << buildEventRecord(buildFlowFile(attributes + [count: i as String]))
+        }
+        logger.info("Generated ${RECORD_COUNT} records")
+
+        final long LAST_RECORD_ID = repo.getMaxEventId()
+
+        // Act
+        repo.registerEvents(records)
+        logger.info("Registered events")
+
+        // Retrieve the events through the interface
+        List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(LAST_RECORD_ID + 1, RECORD_COUNT * 2)
+        logger.info("Recovered ${recoveredRecords.size()} records")
+
+        // Assert
+        recoveredRecords.eachWithIndex { ProvenanceEventRecord recoveredRecord, int i ->
+            assert recoveredRecord.getEventId() == LAST_RECORD_ID + 1 + i
+            assert recoveredRecord.getTransitUri() == TRANSIT_URI
+            assert recoveredRecord.getEventType() == ProvenanceEventType.RECEIVE
+            // The UUID was added later but we care that all attributes we provided are still there
+            assert recoveredRecord.getAttributes().entrySet().containsAll(attributes.entrySet())
+            assert recoveredRecord.getAttribute("count") == i as String
+        }
+    }
+
+    private static class ReportedEvent {
+        final Severity severity
+        final String category
+        final String message
+
+        ReportedEvent(final Severity severity, final String category, final String message) {
+            this.severity = severity
+            this.category = category
+            this.message = message
+        }
+
+        @Override
+        String toString() {
+            "ReportedEvent [${severity}] ${category}: ${message}"
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
index 36397c4..4b2ca50 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
-
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.toc.StandardTocReader;
@@ -67,19 +66,25 @@ public abstract class AbstractTestRecordReaderWriter {
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
+        final String expectedTransitUri = "nifi://unit-test";
+            final int expectedBlockIndex = 0;
+
+        assertRecoveredRecord(journalFile, tocReader, expectedTransitUri, expectedBlockIndex);
+
+        FileUtils.deleteFile(journalFile.getParentFile(), true);
+    }
 
+    private void assertRecoveredRecord(File journalFile, TocReader tocReader, String expectedTransitUri, int expectedBlockIndex) throws IOException {
         try (final FileInputStream fis = new FileInputStream(journalFile);
-            final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
-            assertEquals(0, reader.getBlockIndex());
-            reader.skipToBlock(0);
+             final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
+            assertEquals(expectedBlockIndex, reader.getBlockIndex());
+            reader.skipToBlock(expectedBlockIndex);
             final StandardProvenanceEventRecord recovered = reader.nextRecord();
             assertNotNull(recovered);
 
-            assertEquals("nifi://unit-test", recovered.getTransitUri());
+            assertEquals(expectedTransitUri, recovered.getTransitUri());
             assertNull(reader.nextRecord());
         }
-
-        FileUtils.deleteFile(journalFile.getParentFile(), true);
     }
 
 
@@ -96,16 +101,7 @@ public abstract class AbstractTestRecordReaderWriter {
 
         final TocReader tocReader = new StandardTocReader(tocFile);
 
-        try (final FileInputStream fis = new FileInputStream(journalFile);
-            final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
-            assertEquals(0, reader.getBlockIndex());
-            reader.skipToBlock(0);
-            final StandardProvenanceEventRecord recovered = reader.nextRecord();
-            assertNotNull(recovered);
-
-            assertEquals("nifi://unit-test", recovered.getTransitUri());
-            assertNull(reader.nextRecord());
-        }
+        assertRecoveredRecord(journalFile, tocReader, "nifi://unit-test", 0);
 
         FileUtils.deleteFile(journalFile.getParentFile(), true);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
index f08fed4..c3fbf42 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
@@ -16,6 +16,25 @@
  */
 package org.apache.nifi.provenance;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.AuthorizationResult;
 import org.apache.nifi.authorization.AuthorizationResult.Result;
@@ -42,26 +61,6 @@ import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
 import org.apache.nifi.util.RingBuffer.IterationDirection;
 import org.apache.nifi.web.ResourceNotFoundException;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
 public class VolatileProvenanceRepository implements ProvenanceRepository {
 
     // properties
@@ -472,7 +471,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
     }
 
     public Lineage computeLineage(final String flowFileUUID, final NiFiUser user) throws IOException {
-        return computeLineage(Collections.<String>singleton(flowFileUUID), user, LineageComputationType.FLOWFILE_LINEAGE, null);
+        return computeLineage(Collections.singleton(flowFileUUID), user, LineageComputationType.FLOWFILE_LINEAGE, null);
     }
 
     private Lineage computeLineage(final Collection<String> flowFileUuids, final NiFiUser user, final LineageComputationType computationType, final Long eventId) throws IOException {
@@ -497,7 +496,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
         final ProvenanceEventRecord event = getEvent(eventId);
         if (event == null) {
             final String userId = user.getIdentity();
-            final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.<String>emptySet(), 1, userId);
+            final AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, eventId, Collections.emptySet(), 1, userId);
             result.getResult().setError("Could not find event with ID " + eventId);
             lineageSubmissionMap.put(result.getLineageIdentifier(), result);
             return result;
@@ -541,9 +540,9 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
 
         final ProvenanceEventRecord event = getEvent(eventId, user);
         if (event == null) {
-            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1, userId);
+            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1, userId);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-            submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L);
+            submission.getResult().update(Collections.emptyList(), 0L);
             return submission;
         }
 
@@ -554,7 +553,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
             case CLONE:
                 return submitLineageComputation(event.getParentUuids(), user, LineageComputationType.EXPAND_PARENTS, eventId);
             default: {
-                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1, userId);
+                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.emptyList(), 1, userId);
                 lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
                 submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
                 return submission;
@@ -572,9 +571,9 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
 
         final ProvenanceEventRecord event = getEvent(eventId, user);
         if (event == null) {
-            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId);
+            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1, userId);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-            submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L);
+            submission.getResult().update(Collections.emptyList(), 0L);
             return submission;
         }
 
@@ -585,7 +584,7 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
             case CLONE:
                 return submitLineageComputation(event.getChildUuids(), user, LineageComputationType.EXPAND_CHILDREN, eventId);
             default: {
-                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1, userId);
+                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.emptyList(), 1, userId);
                 lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
                 submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
                 return submission;
@@ -873,5 +872,15 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
         public Long getPreviousContentClaimOffset() {
             return record.getPreviousContentClaimOffset();
         }
+
+        /**
+         * Returns the best event identifier for this event (eventId if available, descriptive identifier if not yet persisted to allow for traceability).
+         *
+         * @return a descriptive event ID to allow tracing
+         */
+        @Override
+        public String getBestEventIdentifier() {
+            return Long.toString(getEventId());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index d0a1f1c..0b206e2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -437,7 +437,7 @@
                         <exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude>
                         <exclude>src/test/resources/TestExtractGrok/patterns</exclude>
                         <!-- This file is copied from https://github.com/jeremyh/jBCrypt because the binary is compiled for Java 8 and we must support Java 7 -->
-                        <exclude>src/main/java/org/apache/nifi/processors/standard/util/crypto/bcrypt/BCrypt.java</exclude>
+                        <exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude>
                     </excludes>
                 </configuration>
             </plugin>

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
index 103790e..db6d9ba 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncryptContent.java
@@ -16,6 +16,16 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.nio.charset.StandardCharsets;
+import java.security.Security;
+import java.text.Normalizer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.lang3.StringUtils;
@@ -41,27 +51,16 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.crypto.CipherUtility;
-import org.apache.nifi.processors.standard.util.crypto.KeyedEncryptor;
-import org.apache.nifi.processors.standard.util.crypto.OpenPGPKeyBasedEncryptor;
-import org.apache.nifi.processors.standard.util.crypto.OpenPGPPasswordBasedEncryptor;
-import org.apache.nifi.processors.standard.util.crypto.PasswordBasedEncryptor;
 import org.apache.nifi.security.util.EncryptionMethod;
 import org.apache.nifi.security.util.KeyDerivationFunction;
+import org.apache.nifi.security.util.crypto.CipherUtility;
+import org.apache.nifi.security.util.crypto.KeyedEncryptor;
+import org.apache.nifi.security.util.crypto.OpenPGPKeyBasedEncryptor;
+import org.apache.nifi.security.util.crypto.OpenPGPPasswordBasedEncryptor;
+import org.apache.nifi.security.util.crypto.PasswordBasedEncryptor;
 import org.apache.nifi.util.StopWatch;
 import org.bouncycastle.jce.provider.BouncyCastleProvider;
 
-import java.nio.charset.StandardCharsets;
-import java.security.Security;
-import java.text.Normalizer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 @EventDriven
 @SideEffectFree
 @SupportsBatching

http://git-wip-us.apache.org/repos/asf/nifi/blob/7d242076/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java
deleted file mode 100644
index 907aed2..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/crypto/AESKeyedCipherProvider.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util.crypto;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.security.util.EncryptionMethod;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.crypto.Cipher;
-import javax.crypto.NoSuchPaddingException;
-import javax.crypto.SecretKey;
-import javax.crypto.spec.IvParameterSpec;
-import java.io.UnsupportedEncodingException;
-import java.security.InvalidAlgorithmParameterException;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
-import java.security.SecureRandom;
-import java.security.spec.InvalidKeySpecException;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * This is a standard implementation of {@link KeyedCipherProvider} which supports {@code AES} cipher families with arbitrary modes of operation (currently only {@code CBC}, {@code CTR}, and {@code
- * GCM} are supported as {@link EncryptionMethod}s.
- */
-public class AESKeyedCipherProvider extends KeyedCipherProvider {
-    private static final Logger logger = LoggerFactory.getLogger(AESKeyedCipherProvider.class);
-    private static final int IV_LENGTH = 16;
-    private static final List<Integer> VALID_KEY_LENGTHS = Arrays.asList(128, 192, 256);
-
-    /**
-     * Returns an initialized cipher for the specified algorithm. The IV is provided externally to allow for non-deterministic IVs, as IVs
-     * deterministically derived from the password are a potential vulnerability and compromise semantic security. See
-     * <a href="http://crypto.stackexchange.com/a/3970/12569">Ilmari Karonen's answer on Crypto Stack Exchange</a>
-     *
-     * @param encryptionMethod the {@link EncryptionMethod}
-     * @param key              the key
-     * @param iv               the IV or nonce (cannot be all 0x00)
-     * @param encryptMode      true for encrypt, false for decrypt
-     * @return the initialized cipher
-     * @throws Exception if there is a problem initializing the cipher
-     */
-    @Override
-    public Cipher getCipher(EncryptionMethod encryptionMethod, SecretKey key, byte[] iv, boolean encryptMode) throws Exception {
-        try {
-            return getInitializedCipher(encryptionMethod, key, iv, encryptMode);
-        } catch (IllegalArgumentException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new ProcessException("Error initializing the cipher", e);
-        }
-    }
-
-    /**
-     * Returns an initialized cipher for the specified algorithm. The IV will be generated internally (for encryption). If decryption is requested, it will throw an exception.
-     *
-     * @param encryptionMethod the {@link EncryptionMethod}
-     * @param key              the key
-     * @param encryptMode      true for encrypt, false for decrypt
-     * @return the initialized cipher
-     * @throws Exception if there is a problem initializing the cipher or if decryption is requested
-     */
-    @Override
-    public Cipher getCipher(EncryptionMethod encryptionMethod, SecretKey key, boolean encryptMode) throws Exception {
-        return getCipher(encryptionMethod, key, new byte[0], encryptMode);
-    }
-
-    protected Cipher getInitializedCipher(EncryptionMethod encryptionMethod, SecretKey key, byte[] iv,
-                                          boolean encryptMode) throws NoSuchAlgorithmException, NoSuchProviderException,
-            InvalidKeySpecException, NoSuchPaddingException, InvalidKeyException, InvalidAlgorithmParameterException, UnsupportedEncodingException {
-        if (encryptionMethod == null) {
-            throw new IllegalArgumentException("The encryption method must be specified");
-        }
-
-        if (!encryptionMethod.isKeyedCipher()) {
-            throw new IllegalArgumentException(encryptionMethod.name() + " requires a PBECipherProvider");
-        }
-
-        String algorithm = encryptionMethod.getAlgorithm();
-        String provider = encryptionMethod.getProvider();
-
-        if (key == null) {
-            throw new IllegalArgumentException("The key must be specified");
-        }
-
-        if (!isValidKeyLength(key)) {
-            throw new IllegalArgumentException("The key must be of length [" + StringUtils.join(VALID_KEY_LENGTHS, ", ") + "]");
-        }
-
-        Cipher cipher = Cipher.getInstance(algorithm, provider);
-        final String operation = encryptMode ? "encrypt" : "decrypt";
-
-        boolean ivIsInvalid = false;
-
-        // If an IV was not provided already, generate a random IV and inject it in the cipher
-        int ivLength = cipher.getBlockSize();
-        if (iv.length != ivLength) {
-            logger.warn("An IV was provided of length {} bytes for {}ion but should be {} bytes", iv.length, operation, ivLength);
-            ivIsInvalid = true;
-        }
-
-        final byte[] emptyIv = new byte[ivLength];
-        if (Arrays.equals(iv, emptyIv)) {
-            logger.warn("An empty IV was provided of length {} for {}ion", iv.length, operation);
-            ivIsInvalid = true;
-        }
-
-        if (ivIsInvalid) {
-            if (encryptMode) {
-                logger.warn("Generating new IV. The value can be obtained in the calling code by invoking 'cipher.getIV()';");
-                iv = generateIV();
-            } else {
-                // Can't decrypt without an IV
-                throw new IllegalArgumentException("Cannot decrypt without a valid IV");
-            }
-        }
-        cipher.init(encryptMode ? Cipher.ENCRYPT_MODE : Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv));
-
-        return cipher;
-    }
-
-    private boolean isValidKeyLength(SecretKey key) {
-        return VALID_KEY_LENGTHS.contains(key.getEncoded().length * 8);
-    }
-
-    /**
-     * Generates a new random IV of 16 bytes using {@link java.security.SecureRandom}.
-     *
-     * @return the IV
-     */
-    public byte[] generateIV() {
-        byte[] iv = new byte[IV_LENGTH];
-        new SecureRandom().nextBytes(iv);
-        return iv;
-    }
-}