You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/18 19:54:02 UTC

[2/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index 35832c4..a95bd4f 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -17,239 +17,142 @@
 package org.apache.nifi.provenance;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
 import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.DataOutputStream;
-import org.apache.nifi.stream.io.GZIPOutputStream;
-import org.apache.nifi.stream.io.NonCloseableOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StandardRecordWriter implements RecordWriter {
+/**
+ * @deprecated Deprecated in favor of SchemaRecordWriter
+ */
+@Deprecated
+public class StandardRecordWriter extends CompressableRecordWriter implements RecordWriter {
     private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
+    public static final int SERIALIZATION_VERISON = 9;
+    public static final String SERIALIZATION_NAME = "org.apache.nifi.provenance.PersistentProvenanceRepository";
 
     private final File file;
-    private final FileOutputStream fos;
-    private final ByteCountingOutputStream rawOutStream;
-    private final TocWriter tocWriter;
-    private final boolean compressed;
-    private final int uncompressedBlockSize;
-    private final AtomicBoolean dirtyFlag = new AtomicBoolean(false);
-
-    private DataOutputStream out;
-    private ByteCountingOutputStream byteCountingOut;
-    private long lastBlockOffset = 0L;
-    private int recordCount = 0;
-    private volatile boolean closed = false;
-
-    private final Lock lock = new ReentrantLock();
 
 
     public StandardRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+        super(file, writer, compressed, uncompressedBlockSize);
         logger.trace("Creating Record Writer for {}", file.getName());
 
         this.file = file;
-        this.compressed = compressed;
-        this.fos = new FileOutputStream(file);
-        rawOutStream = new ByteCountingOutputStream(fos);
-        this.uncompressedBlockSize = uncompressedBlockSize;
+    }
 
-        this.tocWriter = writer;
+    public StandardRecordWriter(final OutputStream out, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+        super(out, tocWriter, compressed, uncompressedBlockSize);
+        this.file = null;
     }
 
     @Override
-    public synchronized File getFile() {
-        return file;
+    protected String getSerializationName() {
+        return SERIALIZATION_NAME;
     }
 
     @Override
-    public synchronized void writeHeader(final long firstEventId) throws IOException {
-        if (isDirty()) {
-            throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
-        }
-
-        try {
-            lastBlockOffset = rawOutStream.getBytesWritten();
-            resetWriteStream(firstEventId);
-
-            out.writeUTF(PersistentProvenanceRepository.class.getName());
-            out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
-            out.flush();
-        } catch (final IOException ioe) {
-            markDirty();
-            throw ioe;
-        }
+    protected int getSerializationVersion() {
+        return SERIALIZATION_VERISON;
     }
 
-
-    /**
-     * Resets the streams to prepare for a new block
-     * @param eventId the first id that will be written to the new block
-     * @throws IOException if unable to flush/close the current streams properly
-     */
-    private void resetWriteStream(final long eventId) throws IOException {
-        try {
-            if (out != null) {
-                out.flush();
-            }
-
-            final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
-
-            final OutputStream writableStream;
-            if ( compressed ) {
-                // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
-                // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
-                // the underlying OutputStream in a NonCloseableOutputStream
-                // We don't have to check if the writer is dirty because we will have already checked before calling this method.
-                if ( out != null ) {
-                    out.close();
-                }
-
-                if ( tocWriter != null ) {
-                    tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
-                }
-
-                writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
-            } else {
-                if ( tocWriter != null ) {
-                    tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
-                }
-
-                writableStream = new BufferedOutputStream(rawOutStream, 65536);
-            }
-
-            this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset);
-            this.out = new DataOutputStream(byteCountingOut);
-            dirtyFlag.set(false);
-        } catch (final IOException ioe) {
-            markDirty();
-            throw ioe;
-        }
+    @Override
+    protected void writeHeader(long firstEventId, DataOutputStream out) throws IOException {
     }
 
     @Override
-    public synchronized long writeRecord(final ProvenanceEventRecord record, final long recordIdentifier) throws IOException {
-        if (isDirty()) {
-            throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
+    protected void writeRecord(final ProvenanceEventRecord record, final long recordIdentifier, final DataOutputStream out) throws IOException {
+        final ProvenanceEventType recordType = record.getEventType();
+
+        out.writeLong(recordIdentifier);
+        out.writeUTF(record.getEventType().name());
+        out.writeLong(record.getEventTime());
+        out.writeLong(record.getFlowFileEntryDate());
+        out.writeLong(record.getEventDuration());
+        out.writeLong(record.getLineageStartDate());
+
+        writeNullableString(out, record.getComponentId());
+        writeNullableString(out, record.getComponentType());
+        writeUUID(out, record.getFlowFileUuid());
+        writeNullableString(out, record.getDetails());
+
+        // Write FlowFile attributes
+        final Map<String, String> attrs = record.getPreviousAttributes();
+        out.writeInt(attrs.size());
+        for (final Map.Entry<String, String> entry : attrs.entrySet()) {
+            writeLongString(out, entry.getKey());
+            writeLongString(out, entry.getValue());
         }
 
-        try {
-            final ProvenanceEventType recordType = record.getEventType();
-            final long startBytes = byteCountingOut.getBytesWritten();
-
-            // add a new block to the TOC if needed.
-            if ( tocWriter != null && (startBytes - lastBlockOffset >= uncompressedBlockSize) ) {
-                lastBlockOffset = startBytes;
-
-                if ( compressed ) {
-                    // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
-                    // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
-                    // the underlying OutputStream in a NonCloseableOutputStream
-                    resetWriteStream(recordIdentifier);
-                }
-            }
-
-            out.writeLong(recordIdentifier);
-            out.writeUTF(record.getEventType().name());
-            out.writeLong(record.getEventTime());
-            out.writeLong(record.getFlowFileEntryDate());
-            out.writeLong(record.getEventDuration());
-            out.writeLong(record.getLineageStartDate());
-
-            writeNullableString(out, record.getComponentId());
-            writeNullableString(out, record.getComponentType());
-            writeUUID(out, record.getFlowFileUuid());
-            writeNullableString(out, record.getDetails());
-
-            // Write FlowFile attributes
-            final Map<String, String> attrs = record.getPreviousAttributes();
-            out.writeInt(attrs.size());
-            for (final Map.Entry<String, String> entry : attrs.entrySet()) {
-                writeLongString(out, entry.getKey());
-                writeLongString(out, entry.getValue());
-            }
-
-            final Map<String, String> attrUpdates = record.getUpdatedAttributes();
-            out.writeInt(attrUpdates.size());
-            for (final Map.Entry<String, String> entry : attrUpdates.entrySet()) {
-                writeLongString(out, entry.getKey());
-                writeLongNullableString(out, entry.getValue());
-            }
+        final Map<String, String> attrUpdates = record.getUpdatedAttributes();
+        out.writeInt(attrUpdates.size());
+        for (final Map.Entry<String, String> entry : attrUpdates.entrySet()) {
+            writeLongString(out, entry.getKey());
+            writeLongNullableString(out, entry.getValue());
+        }
 
-            // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
-            if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) {
-                out.writeBoolean(true);
-                out.writeUTF(record.getContentClaimContainer());
-                out.writeUTF(record.getContentClaimSection());
-                out.writeUTF(record.getContentClaimIdentifier());
-                if (record.getContentClaimOffset() == null) {
-                    out.writeLong(0L);
-                } else {
-                    out.writeLong(record.getContentClaimOffset());
-                }
-                out.writeLong(record.getFileSize());
+        // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
+        if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) {
+            out.writeBoolean(true);
+            out.writeUTF(record.getContentClaimContainer());
+            out.writeUTF(record.getContentClaimSection());
+            out.writeUTF(record.getContentClaimIdentifier());
+            if (record.getContentClaimOffset() == null) {
+                out.writeLong(0L);
             } else {
-                out.writeBoolean(false);
+                out.writeLong(record.getContentClaimOffset());
             }
+            out.writeLong(record.getFileSize());
+        } else {
+            out.writeBoolean(false);
+        }
 
-            // If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
-            if (record.getPreviousContentClaimSection() != null && record.getPreviousContentClaimContainer() != null && record.getPreviousContentClaimIdentifier() != null) {
-                out.writeBoolean(true);
-                out.writeUTF(record.getPreviousContentClaimContainer());
-                out.writeUTF(record.getPreviousContentClaimSection());
-                out.writeUTF(record.getPreviousContentClaimIdentifier());
-                if (record.getPreviousContentClaimOffset() == null) {
-                    out.writeLong(0L);
-                } else {
-                    out.writeLong(record.getPreviousContentClaimOffset());
-                }
-
-                if (record.getPreviousFileSize() == null) {
-                    out.writeLong(0L);
-                } else {
-                    out.writeLong(record.getPreviousFileSize());
-                }
+        // If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
+        if (record.getPreviousContentClaimSection() != null && record.getPreviousContentClaimContainer() != null && record.getPreviousContentClaimIdentifier() != null) {
+            out.writeBoolean(true);
+            out.writeUTF(record.getPreviousContentClaimContainer());
+            out.writeUTF(record.getPreviousContentClaimSection());
+            out.writeUTF(record.getPreviousContentClaimIdentifier());
+            if (record.getPreviousContentClaimOffset() == null) {
+                out.writeLong(0L);
             } else {
-                out.writeBoolean(false);
+                out.writeLong(record.getPreviousContentClaimOffset());
             }
 
-            // write out the identifier of the destination queue.
-            writeNullableString(out, record.getSourceQueueIdentifier());
-
-            // Write type-specific info
-            if (recordType == ProvenanceEventType.FORK || recordType == ProvenanceEventType.JOIN || recordType == ProvenanceEventType.CLONE || recordType == ProvenanceEventType.REPLAY) {
-                writeUUIDs(out, record.getParentUuids());
-                writeUUIDs(out, record.getChildUuids());
-            } else if (recordType == ProvenanceEventType.RECEIVE) {
-                writeNullableString(out, record.getTransitUri());
-                writeNullableString(out, record.getSourceSystemFlowFileIdentifier());
-            } else if (recordType == ProvenanceEventType.FETCH) {
-                writeNullableString(out, record.getTransitUri());
-            } else if (recordType == ProvenanceEventType.SEND) {
-                writeNullableString(out, record.getTransitUri());
-            } else if (recordType == ProvenanceEventType.ADDINFO) {
-                writeNullableString(out, record.getAlternateIdentifierUri());
-            } else if (recordType == ProvenanceEventType.ROUTE) {
-                writeNullableString(out, record.getRelationship());
+            if (record.getPreviousFileSize() == null) {
+                out.writeLong(0L);
+            } else {
+                out.writeLong(record.getPreviousFileSize());
             }
+        } else {
+            out.writeBoolean(false);
+        }
 
-            out.flush();
-            recordCount++;
-            return byteCountingOut.getBytesWritten() - startBytes;
-        } catch (final IOException ioe) {
-            markDirty();
-            throw ioe;
+        // write out the identifier of the destination queue.
+        writeNullableString(out, record.getSourceQueueIdentifier());
+
+        // Write type-specific info
+        if (recordType == ProvenanceEventType.FORK || recordType == ProvenanceEventType.JOIN || recordType == ProvenanceEventType.CLONE || recordType == ProvenanceEventType.REPLAY) {
+            writeUUIDs(out, record.getParentUuids());
+            writeUUIDs(out, record.getChildUuids());
+        } else if (recordType == ProvenanceEventType.RECEIVE) {
+            writeNullableString(out, record.getTransitUri());
+            writeNullableString(out, record.getSourceSystemFlowFileIdentifier());
+        } else if (recordType == ProvenanceEventType.FETCH) {
+            writeNullableString(out, record.getTransitUri());
+        } else if (recordType == ProvenanceEventType.SEND) {
+            writeNullableString(out, record.getTransitUri());
+        } else if (recordType == ProvenanceEventType.ADDINFO) {
+            writeNullableString(out, record.getAlternateIdentifierUri());
+        } else if (recordType == ProvenanceEventType.ROUTE) {
+            writeNullableString(out, record.getRelationship());
         }
     }
 
@@ -292,110 +195,9 @@ public class StandardRecordWriter implements RecordWriter {
         out.write(bytes);
     }
 
-    @Override
-    public synchronized void close() throws IOException {
-        closed = true;
-
-        logger.trace("Closing Record Writer for {}", file.getName());
-
-        lock();
-        try {
-            try {
-                // We want to close 'out' only if the writer is not 'dirty'.
-                // If the writer is dirty, then there was a failure to write
-                // to disk, which means that we likely have a partial record written
-                // to disk.
-                //
-                // If we call close() on out, it will in turn call flush() on the underlying
-                // output stream, which is a BufferedOutputStream. As a result, we will end
-                // up flushing the buffer after a partially written record, which results in
-                // essentially random bytes being written to the repository, which causes
-                // corruption and un-recoverability. Since we will close the underlying 'rawOutStream'
-                // below, we will still appropriately clean up the resources help by this writer, so
-                // we are still OK in terms of closing all resources held by the writer.
-                if (out != null && !isDirty()) {
-                    out.close();
-                }
-            } finally {
-                try {
-                    rawOutStream.close();
-                } finally {
-                    if (tocWriter != null) {
-                        tocWriter.close();
-                    }
-                }
-            }
-        } catch (final IOException ioe) {
-            markDirty();
-            throw ioe;
-        } finally {
-            unlock();
-        }
-    }
-
-    @Override
-    public boolean isClosed() {
-        return closed;
-    }
-
-    @Override
-    public synchronized int getRecordsWritten() {
-        return recordCount;
-    }
-
-    @Override
-    public void lock() {
-        lock.lock();
-    }
-
-    @Override
-    public void unlock() {
-        lock.unlock();
-    }
-
-    @Override
-    public boolean tryLock() {
-        final boolean obtainedLock = lock.tryLock();
-        if (obtainedLock && dirtyFlag.get()) {
-            // once we have obtained the lock, we need to check if the writer
-            // has been marked dirty. If so, we cannot write to the underlying
-            // file, so we need to unlock and return false. Otherwise, it's okay
-            // to write to the underlying file, so return true.
-            lock.unlock();
-            return false;
-        }
-        return obtainedLock;
-    }
 
     @Override
     public String toString() {
         return "StandardRecordWriter[file=" + file + "]";
     }
-
-    @Override
-    public void sync() throws IOException {
-        try {
-            if ( tocWriter != null ) {
-                tocWriter.sync();
-            }
-            fos.getFD().sync();
-        } catch (final IOException ioe) {
-            markDirty();
-            throw ioe;
-        }
-    }
-
-    @Override
-    public TocWriter getTocWriter() {
-        return tocWriter;
-    }
-
-    @Override
-    public void markDirty() {
-        dirtyFlag.set(true);
-    }
-
-    public boolean isDirty() {
-        return dirtyFlag.get();
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
index f474661..f725208 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
@@ -19,7 +19,6 @@ package org.apache.nifi.provenance.lucene;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.lucene.document.Document;
@@ -45,7 +44,7 @@ public class IndexingAction {
     }
 
     private void addField(final Document doc, final SearchableField field, final String value, final Store store) {
-        if (value == null || (!nonAttributeSearchableFields.contains(field) && !field.isAttribute())) {
+        if (value == null || (!field.isAttribute() && !nonAttributeSearchableFields.contains(field))) {
             return;
         }
 
@@ -54,11 +53,9 @@ public class IndexingAction {
 
 
     public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException {
-        final Map<String, String> attributes = record.getAttributes();
-
         final Document doc = new Document();
         addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO);
-        addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
+        addField(doc, SearchableFields.Filename, record.getAttribute(CoreAttributes.FILENAME.key()), Store.NO);
         addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO);
         addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO);
         addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO);
@@ -68,13 +65,10 @@ public class IndexingAction {
         addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO);
         addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO);
         addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO);
-
-        if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
-            addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO);
-        }
+        addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO);
 
         for (final SearchableField searchableField : attributeSearchableFields) {
-            addField(doc, searchableField, LuceneUtil.truncateIndexField(attributes.get(searchableField.getSearchableFieldName())), Store.NO);
+            addField(doc, searchableField, LuceneUtil.truncateIndexField(record.getAttribute(searchableField.getSearchableFieldName())), Store.NO);
         }
 
         final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), ".");
@@ -94,19 +88,20 @@ public class IndexingAction {
             }
 
             // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
-            if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) {
+            final ProvenanceEventType eventType = record.getEventType();
+            if (eventType == ProvenanceEventType.FORK || eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.REPLAY) {
                 for (final String uuid : record.getChildUuids()) {
                     if (!uuid.equals(record.getFlowFileUuid())) {
                         addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
                     }
                 }
-            } else if (record.getEventType() == ProvenanceEventType.JOIN) {
+            } else if (eventType == ProvenanceEventType.JOIN) {
                 for (final String uuid : record.getParentUuids()) {
                     if (!uuid.equals(record.getFlowFileUuid())) {
                         addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
                     }
                 }
-            } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
+            } else if (eventType == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
                 // If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID
                 // that the Source System uses to refer to the data.
                 final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier();

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java
new file mode 100644
index 0000000..c9e7dc8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.schema;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.repository.schema.FieldMapRecord;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class EventRecord implements Record {
+    private final RecordSchema schema;
+    private final ProvenanceEventRecord event;
+    private final long eventId;
+    private final Record contentClaimRecord;
+    private final Record previousClaimRecord;
+
+    public EventRecord(final ProvenanceEventRecord event, final long eventId, final RecordSchema schema, final RecordSchema contentClaimSchema) {
+        this.schema = schema;
+        this.event = event;
+        this.eventId = eventId;
+        this.contentClaimRecord = createContentClaimRecord(contentClaimSchema, event.getContentClaimContainer(), event.getContentClaimSection(),
+            event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize());
+        this.previousClaimRecord = createContentClaimRecord(contentClaimSchema, event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(),
+            event.getPreviousContentClaimIdentifier(), event.getPreviousContentClaimOffset(), event.getPreviousFileSize());
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    private static Record createContentClaimRecord(final RecordSchema contentClaimSchema, final String container, final String section,
+            final String identifier, final Long offset, final Long size) {
+        if (container == null || section == null || identifier == null) {
+            return null;
+        }
+
+        final Map<RecordField, Object> fieldValues = new HashMap<>();
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_CONTAINER, container);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_SECTION, section);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_IDENTIFIER, identifier);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_OFFSET, offset);
+        fieldValues.put(EventRecordFields.CONTENT_CLAIM_SIZE, size);
+        return new FieldMapRecord(fieldValues, contentClaimSchema);
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        switch (fieldName) {
+            case EventRecordFields.Names.EVENT_IDENTIFIER:
+                return eventId;
+            case EventRecordFields.Names.ALTERNATE_IDENTIFIER:
+                return event.getAlternateIdentifierUri();
+            case EventRecordFields.Names.CHILD_UUIDS:
+                return event.getChildUuids();
+            case EventRecordFields.Names.COMPONENT_ID:
+                return event.getComponentId();
+            case EventRecordFields.Names.COMPONENT_TYPE:
+                return event.getComponentType();
+            case EventRecordFields.Names.CONTENT_CLAIM:
+                return contentClaimRecord;
+            case EventRecordFields.Names.EVENT_DETAILS:
+                return event.getDetails();
+            case EventRecordFields.Names.EVENT_DURATION:
+                return event.getEventDuration();
+            case EventRecordFields.Names.EVENT_TIME:
+                return event.getEventTime();
+            case EventRecordFields.Names.EVENT_TYPE:
+                return event.getEventType().name();
+            case EventRecordFields.Names.FLOWFILE_ENTRY_DATE:
+                return event.getFlowFileEntryDate();
+            case EventRecordFields.Names.FLOWFILE_UUID:
+                return event.getFlowFileUuid();
+            case EventRecordFields.Names.LINEAGE_START_DATE:
+                return event.getLineageStartDate();
+            case EventRecordFields.Names.PARENT_UUIDS:
+                return event.getParentUuids();
+            case EventRecordFields.Names.PREVIOUS_ATTRIBUTES:
+                return event.getPreviousAttributes();
+            case EventRecordFields.Names.PREVIOUS_CONTENT_CLAIM:
+                return previousClaimRecord;
+            case EventRecordFields.Names.RELATIONSHIP:
+                return event.getRelationship();
+            case EventRecordFields.Names.SOURCE_QUEUE_IDENTIFIER:
+                return event.getSourceQueueIdentifier();
+            case EventRecordFields.Names.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER:
+                return event.getSourceSystemFlowFileIdentifier();
+            case EventRecordFields.Names.TRANSIT_URI:
+                return event.getTransitUri();
+            case EventRecordFields.Names.UPDATED_ATTRIBUTES:
+                return event.getUpdatedAttributes();
+        }
+
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static StandardProvenanceEventRecord getEvent(final Record record, final String storageFilename, final long storageByteOffset, final int maxAttributeLength) {
+        final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setAlternateIdentifierUri((String) record.getFieldValue(EventRecordFields.Names.ALTERNATE_IDENTIFIER));
+        builder.setChildUuids((List<String>) record.getFieldValue(EventRecordFields.Names.CHILD_UUIDS));
+        builder.setComponentId((String) record.getFieldValue(EventRecordFields.Names.COMPONENT_ID));
+        builder.setComponentType((String) record.getFieldValue(EventRecordFields.Names.COMPONENT_TYPE));
+        builder.setDetails((String) record.getFieldValue(EventRecordFields.Names.EVENT_DETAILS));
+        builder.setEventDuration((Long) record.getFieldValue(EventRecordFields.Names.EVENT_DURATION));
+        builder.setEventTime((Long) record.getFieldValue(EventRecordFields.Names.EVENT_TIME));
+        builder.setEventType(ProvenanceEventType.valueOf((String) record.getFieldValue(EventRecordFields.Names.EVENT_TYPE)));
+        builder.setFlowFileEntryDate((Long) record.getFieldValue(EventRecordFields.Names.FLOWFILE_ENTRY_DATE));
+        builder.setFlowFileUUID((String) record.getFieldValue(EventRecordFields.Names.FLOWFILE_UUID));
+        builder.setLineageStartDate((Long) record.getFieldValue(EventRecordFields.Names.LINEAGE_START_DATE));
+        builder.setParentUuids((List<String>) record.getFieldValue(EventRecordFields.Names.PARENT_UUIDS));
+        builder.setPreviousAttributes(truncateAttributes((Map<String, String>) record.getFieldValue(EventRecordFields.Names.PREVIOUS_ATTRIBUTES), maxAttributeLength));
+        builder.setEventId((Long) record.getFieldValue(EventRecordFields.Names.EVENT_IDENTIFIER));
+        builder.setRelationship((String) record.getFieldValue(EventRecordFields.Names.RELATIONSHIP));
+        builder.setSourceQueueIdentifier((String) record.getFieldValue(EventRecordFields.Names.SOURCE_QUEUE_IDENTIFIER));
+        builder.setSourceSystemFlowFileIdentifier((String) record.getFieldValue(EventRecordFields.Names.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER));
+        builder.setTransitUri((String) record.getFieldValue(EventRecordFields.Names.TRANSIT_URI));
+        builder.setUpdatedAttributes(truncateAttributes((Map<String, String>) record.getFieldValue(EventRecordFields.Names.UPDATED_ATTRIBUTES), maxAttributeLength));
+
+        builder.setStorageLocation(storageFilename, storageByteOffset);
+
+        final Record currentClaimRecord = (Record) record.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM);
+        if (currentClaimRecord == null) {
+            builder.setCurrentContentClaim(null, null, null, null, 0L);
+        } else {
+            builder.setCurrentContentClaim(
+                (String) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_CONTAINER),
+                (String) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SECTION),
+                (String) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_IDENTIFIER),
+                (Long) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_OFFSET),
+                (Long) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SIZE));
+        }
+
+        final Record previousClaimRecord = (Record) record.getFieldValue(EventRecordFields.Names.PREVIOUS_CONTENT_CLAIM);
+        if (previousClaimRecord != null) {
+            builder.setPreviousContentClaim(
+                (String) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_CONTAINER),
+                (String) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SECTION),
+                (String) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_IDENTIFIER),
+                (Long) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_OFFSET),
+                (Long) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SIZE));
+        }
+
+        return builder.build();
+    }
+
+    private static Map<String, String> truncateAttributes(final Map<String, String> attributes, final int maxAttributeLength) {
+        if (attributes == null) {
+            return null;
+        }
+
+        // Check if any attribute value exceeds the attribute length
+        final boolean anyExceedsLength = attributes.values().stream()
+            .filter(value -> value != null)
+            .anyMatch(value -> value.length() > maxAttributeLength);
+
+        if (!anyExceedsLength) {
+            return attributes;
+        }
+
+        final Map<String, String> truncated = new HashMap<>();
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+            final String key = entry.getKey();
+            final String value = entry.getValue();
+
+            if (value == null || value.length() <= maxAttributeLength) {
+                truncated.put(key, value);
+                continue;
+            }
+
+            truncated.put(key, value.substring(0, maxAttributeLength));
+        }
+
+        return truncated;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java
new file mode 100644
index 0000000..0582dd8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.schema;
+
+import static org.apache.nifi.repository.schema.Repetition.EXACTLY_ONE;
+import static org.apache.nifi.repository.schema.Repetition.ZERO_OR_MORE;
+import static org.apache.nifi.repository.schema.Repetition.ZERO_OR_ONE;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.MapRecordField;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class EventRecordFields {
+
+    public static class Names {
+        public static final String EVENT_IDENTIFIER = "Event ID";
+        public static final String EVENT_TYPE = "Event Type";
+        public static final String EVENT_TIME = "Event Time";
+        public static final String FLOWFILE_ENTRY_DATE = "FlowFile Entry Date";
+        public static final String EVENT_DURATION = "Event Duration";
+        public static final String LINEAGE_START_DATE = "Lineage Start Date";
+        public static final String COMPONENT_ID = "Component ID";
+        public static final String COMPONENT_TYPE = "Component Type";
+        public static final String FLOWFILE_UUID = "FlowFile UUID";
+        public static final String EVENT_DETAILS = "Event Details";
+        public static final String SOURCE_QUEUE_IDENTIFIER = "Source Queue Identifier";
+        public static final String CONTENT_CLAIM = "Content Claim";
+        public static final String PREVIOUS_CONTENT_CLAIM = "Previous Content Claim";
+        public static final String PARENT_UUIDS = "Parent UUIDs";
+        public static final String CHILD_UUIDS = "Child UUIDs";
+
+        public static final String ATTRIBUTE_NAME = "Attribute Name";
+        public static final String ATTRIBUTE_VALUE = "Attribute Value";
+        public static final String PREVIOUS_ATTRIBUTES = "Previous Attributes";
+        public static final String UPDATED_ATTRIBUTES = "Updated Attributes";
+
+        public static final String CONTENT_CLAIM_CONTAINER = "Content Claim Container";
+        public static final String CONTENT_CLAIM_SECTION = "Content Claim Section";
+        public static final String CONTENT_CLAIM_IDENTIFIER = "Content Claim Identifier";
+        public static final String CONTENT_CLAIM_OFFSET = "Content Claim Offset";
+        public static final String CONTENT_CLAIM_SIZE = "Content Claim Size";
+
+        public static final String TRANSIT_URI = "Transit URI";
+        public static final String SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = "Source System FlowFile Identifier";
+        public static final String ALTERNATE_IDENTIFIER = "Alternate Identifier";
+        public static final String RELATIONSHIP = "Relationship";
+    }
+
+    // General Event fields.
+    public static final RecordField RECORD_IDENTIFIER = new SimpleRecordField(Names.EVENT_IDENTIFIER, FieldType.LONG, EXACTLY_ONE);
+    public static final RecordField EVENT_TYPE = new SimpleRecordField(Names.EVENT_TYPE, FieldType.STRING, EXACTLY_ONE);
+    public static final RecordField EVENT_TIME = new SimpleRecordField(Names.EVENT_TIME, FieldType.LONG, EXACTLY_ONE);
+    public static final RecordField FLOWFILE_ENTRY_DATE = new SimpleRecordField(Names.FLOWFILE_ENTRY_DATE, FieldType.LONG, EXACTLY_ONE);
+    public static final RecordField EVENT_DURATION = new SimpleRecordField(Names.EVENT_DURATION, FieldType.LONG, EXACTLY_ONE);
+    public static final RecordField LINEAGE_START_DATE = new SimpleRecordField(Names.LINEAGE_START_DATE, FieldType.LONG, EXACTLY_ONE);
+    public static final RecordField COMPONENT_ID = new SimpleRecordField(Names.COMPONENT_ID, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField COMPONENT_TYPE = new SimpleRecordField(Names.COMPONENT_TYPE, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField FLOWFILE_UUID = new SimpleRecordField(Names.FLOWFILE_UUID, FieldType.STRING, EXACTLY_ONE);
+    public static final RecordField EVENT_DETAILS = new SimpleRecordField(Names.EVENT_DETAILS, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField SOURCE_QUEUE_IDENTIFIER = new SimpleRecordField(Names.SOURCE_QUEUE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE);
+
+    // Attributes
+    public static final RecordField ATTRIBUTE_NAME = new SimpleRecordField(Names.ATTRIBUTE_NAME, FieldType.LONG_STRING, EXACTLY_ONE);
+    public static final RecordField ATTRIBUTE_VALUE_REQUIRED = new SimpleRecordField(Names.ATTRIBUTE_VALUE, FieldType.LONG_STRING, EXACTLY_ONE);
+    public static final RecordField ATTRIBUTE_VALUE_OPTIONAL = new SimpleRecordField(Names.ATTRIBUTE_VALUE, FieldType.LONG_STRING, ZERO_OR_ONE);
+
+    public static final RecordField PREVIOUS_ATTRIBUTES = new MapRecordField(Names.PREVIOUS_ATTRIBUTES, ATTRIBUTE_NAME, ATTRIBUTE_VALUE_REQUIRED, EXACTLY_ONE);
+    public static final RecordField UPDATED_ATTRIBUTES = new MapRecordField(Names.UPDATED_ATTRIBUTES, ATTRIBUTE_NAME, ATTRIBUTE_VALUE_OPTIONAL, EXACTLY_ONE);
+
+    // Content Claims
+    public static final RecordField CONTENT_CLAIM_CONTAINER = new SimpleRecordField(Names.CONTENT_CLAIM_CONTAINER, FieldType.STRING, EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_SECTION = new SimpleRecordField(Names.CONTENT_CLAIM_SECTION, FieldType.STRING, EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_IDENTIFIER = new SimpleRecordField(Names.CONTENT_CLAIM_IDENTIFIER, FieldType.STRING, EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_OFFSET = new SimpleRecordField(Names.CONTENT_CLAIM_OFFSET, FieldType.LONG, EXACTLY_ONE);
+    public static final RecordField CONTENT_CLAIM_SIZE = new SimpleRecordField(Names.CONTENT_CLAIM_SIZE, FieldType.LONG, EXACTLY_ONE);
+    public static final RecordField CURRENT_CONTENT_CLAIM = new ComplexRecordField(Names.CONTENT_CLAIM, ZERO_OR_ONE,
+        CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
+    public static final RecordField PREVIOUS_CONTENT_CLAIM = new ComplexRecordField(Names.PREVIOUS_CONTENT_CLAIM, ZERO_OR_ONE,
+        CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
+
+    // EventType-Specific fields
+    // for FORK, JOIN, CLONE, REPLAY
+    public static final RecordField PARENT_UUIDS = new SimpleRecordField(Names.PARENT_UUIDS, FieldType.STRING, ZERO_OR_MORE);
+    public static final RecordField CHILD_UUIDS = new SimpleRecordField(Names.CHILD_UUIDS, FieldType.STRING, ZERO_OR_MORE);
+
+    // for SEND/RECEIVE/FETCH
+    public static final RecordField TRANSIT_URI = new SimpleRecordField(Names.TRANSIT_URI, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = new SimpleRecordField(Names.SOURCE_QUEUE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE);
+
+    // for ADD_INFO
+    public static final RecordField ALTERNATE_IDENTIFIER = new SimpleRecordField(Names.ALTERNATE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE);
+    public static final RecordField RELATIONSHIP = new SimpleRecordField(Names.RELATIONSHIP, FieldType.STRING, ZERO_OR_ONE);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java
new file mode 100644
index 0000000..d70bd39
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.schema;
+
+import static org.apache.nifi.provenance.schema.EventRecordFields.ALTERNATE_IDENTIFIER;
+import static org.apache.nifi.provenance.schema.EventRecordFields.CHILD_UUIDS;
+import static org.apache.nifi.provenance.schema.EventRecordFields.COMPONENT_ID;
+import static org.apache.nifi.provenance.schema.EventRecordFields.COMPONENT_TYPE;
+import static org.apache.nifi.provenance.schema.EventRecordFields.CURRENT_CONTENT_CLAIM;
+import static org.apache.nifi.provenance.schema.EventRecordFields.EVENT_DETAILS;
+import static org.apache.nifi.provenance.schema.EventRecordFields.EVENT_DURATION;
+import static org.apache.nifi.provenance.schema.EventRecordFields.EVENT_TIME;
+import static org.apache.nifi.provenance.schema.EventRecordFields.EVENT_TYPE;
+import static org.apache.nifi.provenance.schema.EventRecordFields.FLOWFILE_ENTRY_DATE;
+import static org.apache.nifi.provenance.schema.EventRecordFields.FLOWFILE_UUID;
+import static org.apache.nifi.provenance.schema.EventRecordFields.LINEAGE_START_DATE;
+import static org.apache.nifi.provenance.schema.EventRecordFields.PARENT_UUIDS;
+import static org.apache.nifi.provenance.schema.EventRecordFields.PREVIOUS_ATTRIBUTES;
+import static org.apache.nifi.provenance.schema.EventRecordFields.PREVIOUS_CONTENT_CLAIM;
+import static org.apache.nifi.provenance.schema.EventRecordFields.RECORD_IDENTIFIER;
+import static org.apache.nifi.provenance.schema.EventRecordFields.RELATIONSHIP;
+import static org.apache.nifi.provenance.schema.EventRecordFields.SOURCE_QUEUE_IDENTIFIER;
+import static org.apache.nifi.provenance.schema.EventRecordFields.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER;
+import static org.apache.nifi.provenance.schema.EventRecordFields.TRANSIT_URI;
+import static org.apache.nifi.provenance.schema.EventRecordFields.UPDATED_ATTRIBUTES;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class ProvenanceEventSchema {
+    public static final RecordSchema PROVENANCE_EVENT_SCHEMA_V1 = buildSchemaV1();
+
+    private static RecordSchema buildSchemaV1() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(RECORD_IDENTIFIER);
+        fields.add(EVENT_TYPE);
+        fields.add(EVENT_TIME);
+        fields.add(FLOWFILE_ENTRY_DATE);
+        fields.add(EVENT_DURATION);
+        fields.add(LINEAGE_START_DATE);
+        fields.add(COMPONENT_ID);
+        fields.add(COMPONENT_TYPE);
+        fields.add(FLOWFILE_UUID);
+        fields.add(EVENT_DETAILS);
+        fields.add(PREVIOUS_ATTRIBUTES);
+        fields.add(UPDATED_ATTRIBUTES);
+        fields.add(CURRENT_CONTENT_CLAIM);
+        fields.add(PREVIOUS_CONTENT_CLAIM);
+        fields.add(SOURCE_QUEUE_IDENTIFIER);
+
+        // EventType-Specific fields
+        fields.add(PARENT_UUIDS);  // for FORK, JOIN, CLONE, REPLAY events
+        fields.add(CHILD_UUIDS); // for FORK, JOIN, CLONE, REPLAY events
+        fields.add(TRANSIT_URI); // for SEND/RECEIVE/FETCH events
+        fields.add(SOURCE_SYSTEM_FLOWFILE_IDENTIFIER); // for SEND/RECEIVE events
+        fields.add(ALTERNATE_IDENTIFIER); // for ADD_INFO events
+        fields.add(RELATIONSHIP); // for ROUTE events
+
+        final RecordSchema schema = new RecordSchema(fields);
+        return schema;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
new file mode 100644
index 0000000..056829a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.serialization;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.StandardRecordReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CompressableRecordReader implements RecordReader {
+    private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
+
+    private final ByteCountingInputStream rawInputStream;
+    private final String filename;
+    private final int serializationVersion;
+    private final boolean compressed;
+    private final TocReader tocReader;
+    private final int headerLength;
+    private final int maxAttributeChars;
+
+    private DataInputStream dis;
+    private ByteCountingInputStream byteCountingIn;
+
+    public CompressableRecordReader(final InputStream in, final String filename, final int maxAttributeChars) throws IOException {
+        this(in, filename, null, maxAttributeChars);
+    }
+
+    public CompressableRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException {
+        logger.trace("Creating RecordReader for {}", filename);
+
+        rawInputStream = new ByteCountingInputStream(in);
+        this.maxAttributeChars = maxAttributeChars;
+
+        final InputStream limitedStream;
+        if (tocReader == null) {
+            limitedStream = rawInputStream;
+        } else {
+            final long offset1 = tocReader.getBlockOffset(1);
+            if (offset1 < 0) {
+                limitedStream = rawInputStream;
+            } else {
+                limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
+            }
+        }
+
+        final InputStream readableStream;
+        if (filename.endsWith(".gz")) {
+            readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
+            compressed = true;
+        } else {
+            readableStream = new BufferedInputStream(limitedStream);
+            compressed = false;
+        }
+
+        byteCountingIn = new ByteCountingInputStream(readableStream);
+        dis = new DataInputStream(byteCountingIn);
+
+        final String repoClassName = dis.readUTF();
+        final int serializationVersion = dis.readInt();
+        headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer.
+
+        this.serializationVersion = serializationVersion;
+        this.filename = filename;
+        this.tocReader = tocReader;
+
+        readHeader(dis, serializationVersion);
+    }
+
+    @Override
+    public void skipToBlock(final int blockIndex) throws IOException {
+        if (tocReader == null) {
+            throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
+        }
+
+        if (blockIndex < 0) {
+            throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
+        }
+
+        if (blockIndex == getBlockIndex()) {
+            return;
+        }
+
+        final long offset = tocReader.getBlockOffset(blockIndex);
+        if (offset < 0) {
+            throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
+        }
+
+        final long curOffset = rawInputStream.getBytesConsumed();
+
+        final long bytesToSkip = offset - curOffset;
+        if (bytesToSkip >= 0) {
+            try {
+                StreamUtils.skip(rawInputStream, bytesToSkip);
+                logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
+            } catch (final IOException e) {
+                throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
+            }
+
+            resetStreamForNextBlock();
+        }
+    }
+
+    private void resetStreamForNextBlock() throws IOException {
+        final InputStream limitedStream;
+        if (tocReader == null) {
+            limitedStream = rawInputStream;
+        } else {
+            final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
+            if (offset < 0) {
+                limitedStream = rawInputStream;
+            } else {
+                limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
+            }
+        }
+
+        final InputStream readableStream;
+        if (compressed) {
+            readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
+        } else {
+            readableStream = new BufferedInputStream(limitedStream);
+        }
+
+        byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed());
+        dis = new DataInputStream(byteCountingIn);
+    }
+
+
+    @Override
+    public TocReader getTocReader() {
+        return tocReader;
+    }
+
+    @Override
+    public boolean isBlockIndexAvailable() {
+        return tocReader != null;
+    }
+
+    @Override
+    public int getBlockIndex() {
+        if (tocReader == null) {
+            throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
+        }
+
+        return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
+    }
+
+    @Override
+    public long getBytesConsumed() {
+        return byteCountingIn.getBytesConsumed();
+    }
+
+    private boolean isData() throws IOException {
+        byteCountingIn.mark(1);
+        int nextByte = byteCountingIn.read();
+        byteCountingIn.reset();
+
+        if (nextByte < 0) {
+            try {
+                resetStreamForNextBlock();
+            } catch (final EOFException eof) {
+                return false;
+            }
+
+            byteCountingIn.mark(1);
+            nextByte = byteCountingIn.read();
+            byteCountingIn.reset();
+        }
+
+        return nextByte >= 0;
+    }
+
+    @Override
+    public long getMaxEventId() throws IOException {
+        if (tocReader != null) {
+            final long lastBlockOffset = tocReader.getLastBlockOffset();
+            skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
+        }
+
+        ProvenanceEventRecord record;
+        ProvenanceEventRecord lastRecord = null;
+        try {
+            while ((record = nextRecord()) != null) {
+                lastRecord = record;
+            }
+        } catch (final EOFException eof) {
+            // This can happen if we stop NIFi while the record is being written.
+            // This is OK, we just ignore this record. The session will not have been
+            // committed, so we can just process the FlowFile again.
+        }
+
+        return lastRecord == null ? -1L : lastRecord.getEventId();
+    }
+
+    @Override
+    public void close() throws IOException {
+        logger.trace("Closing Record Reader for {}", filename);
+
+        try {
+            dis.close();
+        } finally {
+            try {
+                rawInputStream.close();
+            } finally {
+                if (tocReader != null) {
+                    tocReader.close();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void skip(final long bytesToSkip) throws IOException {
+        StreamUtils.skip(dis, bytesToSkip);
+    }
+
+    @Override
+    public void skipTo(final long position) throws IOException {
+        // we are subtracting headerLength from the number of bytes consumed because we used to
+        // consider the offset of the first record "0" - now we consider it whatever position it
+        // it really is in the stream.
+        final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength;
+        if (currentPosition == position) {
+            return;
+        }
+        if (currentPosition > position) {
+            throw new IOException("Cannot skip to byte offset " + position + " in stream because already at byte offset " + currentPosition);
+        }
+
+        final long toSkip = position - currentPosition;
+        StreamUtils.skip(dis, toSkip);
+    }
+
+    protected String getFilename() {
+        return filename;
+    }
+
+    protected int getMaxAttributeLength() {
+        return maxAttributeChars;
+    }
+
+    @Override
+    public StandardProvenanceEventRecord nextRecord() throws IOException {
+        if (isData()) {
+            return nextRecord(dis, serializationVersion);
+        } else {
+            return null;
+        }
+    }
+
+    protected abstract StandardProvenanceEventRecord nextRecord(DataInputStream in, int serializationVersion) throws IOException;
+
+    protected void readHeader(DataInputStream in, int serializationVersion) throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
new file mode 100644
index 0000000..fa0e390
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.serialization;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.provenance.AbstractRecordWriter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.stream.io.DataOutputStream;
+import org.apache.nifi.stream.io.GZIPOutputStream;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CompressableRecordWriter extends AbstractRecordWriter {
+    private static final Logger logger = LoggerFactory.getLogger(CompressableRecordWriter.class);
+
+    private final FileOutputStream fos;
+    private final ByteCountingOutputStream rawOutStream;
+    private final boolean compressed;
+    private final int uncompressedBlockSize;
+
+    private DataOutputStream out;
+    private ByteCountingOutputStream byteCountingOut;
+    private long lastBlockOffset = 0L;
+    private int recordCount = 0;
+
+
+    public CompressableRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+        super(file, writer);
+        logger.trace("Creating Record Writer for {}", file.getName());
+
+        this.compressed = compressed;
+        this.fos = new FileOutputStream(file);
+        rawOutStream = new ByteCountingOutputStream(fos);
+        this.uncompressedBlockSize = uncompressedBlockSize;
+    }
+
+    public CompressableRecordWriter(final OutputStream out, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+        super(null, tocWriter);
+        this.fos = null;
+
+        this.compressed = compressed;
+        this.uncompressedBlockSize = uncompressedBlockSize;
+        this.rawOutStream = new ByteCountingOutputStream(out);
+    }
+
+
+    @Override
+    public synchronized void writeHeader(final long firstEventId) throws IOException {
+        if (isDirty()) {
+            throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
+        }
+
+        try {
+            lastBlockOffset = rawOutStream.getBytesWritten();
+            resetWriteStream(firstEventId);
+            out.writeUTF(getSerializationName());
+            out.writeInt(getSerializationVersion());
+            writeHeader(firstEventId, out);
+            out.flush();
+            lastBlockOffset = rawOutStream.getBytesWritten();
+        } catch (final IOException ioe) {
+            markDirty();
+            throw ioe;
+        }
+    }
+
+
+
+    /**
+     * Resets the streams to prepare for a new block
+     *
+     * @param eventId the first id that will be written to the new block
+     * @throws IOException if unable to flush/close the current streams properly
+     */
+    private void resetWriteStream(final long eventId) throws IOException {
+        try {
+            if (out != null) {
+                out.flush();
+            }
+
+            final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
+            final TocWriter tocWriter = getTocWriter();
+
+            final OutputStream writableStream;
+            if (compressed) {
+                // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+                // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+                // the underlying OutputStream in a NonCloseableOutputStream
+                // We don't have to check if the writer is dirty because we will have already checked before calling this method.
+                if (out != null) {
+                    out.close();
+                }
+
+                if (tocWriter != null) {
+                    tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
+                }
+
+                writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
+            } else {
+                if (tocWriter != null) {
+                    tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
+                }
+
+                writableStream = new BufferedOutputStream(rawOutStream, 65536);
+            }
+
+            this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset);
+            this.out = new DataOutputStream(byteCountingOut);
+            resetDirtyFlag();
+        } catch (final IOException ioe) {
+            markDirty();
+            throw ioe;
+        }
+    }
+
+
+
+    @Override
+    public long writeRecord(final ProvenanceEventRecord record, final long recordIdentifier) throws IOException {
+        if (isDirty()) {
+            throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
+        }
+
+        try {
+            final long startBytes = byteCountingOut.getBytesWritten();
+
+            // add a new block to the TOC if needed.
+            if (getTocWriter() != null && (startBytes - lastBlockOffset >= uncompressedBlockSize)) {
+                lastBlockOffset = startBytes;
+
+                if (compressed) {
+                    // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+                    // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+                    // the underlying OutputStream in a NonCloseableOutputStream
+                    resetWriteStream(recordIdentifier);
+                }
+            }
+
+            writeRecord(record, recordIdentifier, out);
+
+            recordCount++;
+            return byteCountingOut.getBytesWritten() - startBytes;
+        } catch (final IOException ioe) {
+            markDirty();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        out.flush();
+    }
+
+    @Override
+    public synchronized int getRecordsWritten() {
+        return recordCount;
+    }
+
+    @Override
+    protected OutputStream getBufferedOutputStream() {
+        return out;
+    }
+
+    @Override
+    protected OutputStream getUnderlyingOutputStream() {
+        return fos;
+    }
+
+    @Override
+    protected void syncUnderlyingOutputStream() throws IOException {
+        if (fos != null) {
+            fos.getFD().sync();
+        }
+    }
+
+    protected abstract void writeRecord(final ProvenanceEventRecord event, final long eventId, final DataOutputStream out) throws IOException;
+
+    protected abstract void writeHeader(final long firstEventId, final DataOutputStream out) throws IOException;
+
+    protected abstract int getSerializationVersion();
+
+    protected abstract String getSerializationName();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java
new file mode 100644
index 0000000..38a4cc9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.serialization;
+
+import java.io.IOException;
+
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocReader;
+
+public class EmptyRecordReader implements RecordReader {
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public StandardProvenanceEventRecord nextRecord() throws IOException {
+        return null;
+    }
+
+    @Override
+    public void skip(long bytesToSkip) throws IOException {
+    }
+
+    @Override
+    public void skipTo(long position) throws IOException {
+    }
+
+    @Override
+    public void skipToBlock(int blockIndex) throws IOException {
+    }
+
+    @Override
+    public int getBlockIndex() {
+        return 0;
+    }
+
+    @Override
+    public boolean isBlockIndexAvailable() {
+        return false;
+    }
+
+    @Override
+    public TocReader getTocReader() {
+        return null;
+    }
+
+    @Override
+    public long getBytesConsumed() {
+        return 0;
+    }
+
+    @Override
+    public long getMaxEventId() throws IOException {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index 7889cd6..24efcbd 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -16,6 +16,9 @@
  */
 package org.apache.nifi.provenance.serialization;
 
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -23,8 +26,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Path;
 import java.util.Collection;
+import java.util.zip.GZIPInputStream;
 
+import org.apache.nifi.provenance.ByteArraySchemaRecordReader;
+import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
 import org.apache.nifi.provenance.StandardRecordReader;
+import org.apache.nifi.provenance.StandardRecordWriter;
 import org.apache.nifi.provenance.lucene.LuceneUtil;
 import org.apache.nifi.provenance.toc.StandardTocReader;
 import org.apache.nifi.provenance.toc.TocReader;
@@ -101,11 +108,39 @@ public class RecordReaders {
             }
 
             final File tocFile = TocUtil.getTocFile(file);
-            if ( tocFile.exists() ) {
-                final TocReader tocReader = new StandardTocReader(tocFile);
-                return new StandardRecordReader(fis, filename, tocReader, maxAttributeChars);
-            } else {
-                return new StandardRecordReader(fis, filename, maxAttributeChars);
+
+            final InputStream bufferedInStream = new BufferedInputStream(fis);
+            final String serializationName;
+            try {
+                bufferedInStream.mark(4096);
+                final InputStream in = filename.endsWith(".gz") ? new GZIPInputStream(bufferedInStream) : bufferedInStream;
+                final DataInputStream dis = new DataInputStream(in);
+                serializationName = dis.readUTF();
+                bufferedInStream.reset();
+            } catch (final EOFException eof) {
+                return new EmptyRecordReader();
+            }
+
+            switch (serializationName) {
+                case StandardRecordWriter.SERIALIZATION_NAME: {
+                    if (tocFile.exists()) {
+                        final TocReader tocReader = new StandardTocReader(tocFile);
+                        return new StandardRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars);
+                    } else {
+                        return new StandardRecordReader(bufferedInStream, filename, maxAttributeChars);
+                    }
+                }
+                case ByteArraySchemaRecordWriter.SERIALIZATION_NAME: {
+                    if (tocFile.exists()) {
+                        final TocReader tocReader = new StandardTocReader(tocFile);
+                        return new ByteArraySchemaRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars);
+                    } else {
+                        return new ByteArraySchemaRecordReader(bufferedInStream, filename, maxAttributeChars);
+                    }
+                }
+                default: {
+                    throw new IOException("Unable to read data from file " + file + " because the file was written using an unknown Serializer: " + serializationName);
+                }
             }
         } catch (final IOException ioe) {
             if ( fis != null ) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index b157ccc..17dd75c 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -44,6 +44,13 @@ public interface RecordWriter extends Closeable {
     long writeRecord(ProvenanceEventRecord record, long recordIdentifier) throws IOException;
 
     /**
+     * Flushes any data that is held in a buffer to the underlying storage mechanism
+     *
+     * @throws IOException if unable to flush the bytes
+     */
+    void flush() throws IOException;
+
+    /**
      * @return the number of Records that have been written to this RecordWriter
      */
     int getRecordsWritten();

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
index cf8f7b4..be4c9cf 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
@@ -19,7 +19,7 @@ package org.apache.nifi.provenance.serialization;
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.nifi.provenance.StandardRecordWriter;
+import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
 import org.apache.nifi.provenance.toc.StandardTocWriter;
 import org.apache.nifi.provenance.toc.TocUtil;
 import org.apache.nifi.provenance.toc.TocWriter;
@@ -27,13 +27,13 @@ import org.apache.nifi.provenance.toc.TocWriter;
 public class RecordWriters {
     private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB
 
-    public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException {
-        return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
+    public static RecordWriter newSchemaRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException {
+        return newSchemaRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
     }
 
-    public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException {
+    public static RecordWriter newSchemaRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException {
         final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
-        return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes);
+        return new ByteArraySchemaRecordWriter(file, tocWriter, compressed, compressionBlockBytes);
     }
 
 }