You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/18 19:54:03 UTC

[3/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 97226b2..46bea31 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -22,26 +22,20 @@ import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.Files;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.SwapContents;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.events.EventReporter;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -56,7 +50,9 @@ public class TestFileSystemSwapManager {
             final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
             Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
-            final SwapContents swapContents = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager());
+            final FileSystemSwapManager swapManager = createSwapManager();
+            final SwapContents swapContents = swapManager.peek("src/test/resources/old-swap-file.swap", flowFileQueue);
+
             final List<FlowFileRecord> records = swapContents.getFlowFiles();
             assertEquals(10000, records.size());
 
@@ -67,53 +63,32 @@ public class TestFileSystemSwapManager {
         }
     }
 
-    @Test
-    public void testRoundTripSerializeDeserialize() throws IOException {
-        final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
-        final Map<String, String> attrs = new HashMap<>();
-        for (int i = 0; i < 10000; i++) {
-            attrs.put("i", String.valueOf(i));
-            final FlowFileRecord ff = new TestFlowFile(attrs, i);
-            toSwap.add(ff);
-        }
-
-        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
-        Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
-        final String swapLocation = "target/testRoundTrip.swap";
-        final File swapFile = new File(swapLocation);
-        Files.deleteIfExists(swapFile.toPath());
+    private FileSystemSwapManager createSwapManager() {
+        final FileSystemSwapManager swapManager = new FileSystemSwapManager();
+        final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager();
+        final FlowFileRepository flowfileRepo = Mockito.mock(FlowFileRepository.class);
+        swapManager.initialize(new SwapManagerInitializationContext() {
+            @Override
+            public ResourceClaimManager getResourceClaimManager() {
+                return resourceClaimManager;
+            }
 
-        try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
-            FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
-        }
+            @Override
+            public FlowFileRepository getFlowFileRepository() {
+                return flowfileRepo;
+            }
 
-        final SwapContents swappedIn;
-        try (final FileInputStream fis = new FileInputStream(swapFile);
-                final DataInputStream dis = new DataInputStream(fis)) {
-            swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class));
-        }
+            @Override
+            public EventReporter getEventReporter() {
+                return EventReporter.NO_OP;
+            }
+        });
 
-        assertEquals(toSwap.size(), swappedIn.getFlowFiles().size());
-        for (int i = 0; i < toSwap.size(); i++) {
-            final FlowFileRecord pre = toSwap.get(i);
-            final FlowFileRecord post = swappedIn.getFlowFiles().get(i);
-
-            assertEquals(pre.getSize(), post.getSize());
-            assertEquals(pre.getAttributes(), post.getAttributes());
-            assertEquals(pre.getSize(), post.getSize());
-            assertEquals(pre.getId(), post.getId());
-            assertEquals(pre.getContentClaim(), post.getContentClaim());
-            assertEquals(pre.getContentClaimOffset(), post.getContentClaimOffset());
-            assertEquals(pre.getEntryDate(), post.getEntryDate());
-            assertEquals(pre.getLastQueueDate(), post.getLastQueueDate());
-            assertEquals(pre.getLineageStartDate(), post.getLineageStartDate());
-            assertEquals(pre.getPenaltyExpirationMillis(), post.getPenaltyExpirationMillis());
-        }
+        return swapManager;
     }
 
     public class NopResourceClaimManager implements ResourceClaimManager {
-
         @Override
         public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable) {
             return null;
@@ -165,89 +140,4 @@ public class TestFileSystemSwapManager {
         }
     }
 
-    private static class TestFlowFile implements FlowFileRecord {
-
-        private static final AtomicLong idGenerator = new AtomicLong(0L);
-
-        private final long id = idGenerator.getAndIncrement();
-        private final long entryDate = System.currentTimeMillis();
-        private final long lastQueueDate = System.currentTimeMillis();
-        private final Map<String, String> attributes;
-        private final long size;
-
-        public TestFlowFile(final Map<String, String> attributes, final long size) {
-            this.attributes = attributes;
-            this.size = size;
-        }
-
-        @Override
-        public long getId() {
-            return id;
-        }
-
-        @Override
-        public long getEntryDate() {
-            return entryDate;
-        }
-
-        @Override
-        public long getLineageStartDate() {
-            return entryDate;
-        }
-
-        @Override
-        public Long getLastQueueDate() {
-            return lastQueueDate;
-        }
-
-        @Override
-        public boolean isPenalized() {
-            return false;
-        }
-
-        @Override
-        public String getAttribute(String key) {
-            return attributes.get(key);
-        }
-
-        @Override
-        public long getSize() {
-            return size;
-        }
-
-        @Override
-        public Map<String, String> getAttributes() {
-            return Collections.unmodifiableMap(attributes);
-        }
-
-        @Override
-        public int compareTo(final FlowFile o) {
-            return Long.compare(id, o.getId());
-        }
-
-        @Override
-        public long getPenaltyExpirationMillis() {
-            return -1L;
-        }
-
-        @Override
-        public ContentClaim getContentClaim() {
-            return null;
-        }
-
-        @Override
-        public long getContentClaimOffset() {
-            return 0;
-        }
-
-        @Override
-        public long getLineageStartIndex() {
-            return 0;
-        }
-
-        @Override
-        public long getQueueDateIndex() {
-            return 0;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index b2ea0b9..6525822 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -49,6 +49,7 @@ import org.apache.nifi.controller.swap.StandardSwapContents;
 import org.apache.nifi.controller.swap.StandardSwapSummary;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.file.FileUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -64,6 +65,7 @@ public class TestWriteAheadFlowFileRepository {
     }
 
     @Before
+    @After
     public void clearRepo() throws IOException {
         final File target = new File("target");
         final File testRepo = new File(target, "test-repo");

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/MockFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/MockFlowFile.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/MockFlowFile.java
new file mode 100644
index 0000000..7b5f72c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/MockFlowFile.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.flowfile.FlowFile;
+
+public class MockFlowFile implements FlowFileRecord {
+    private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+    private final long id;
+    private final long entryDate = System.currentTimeMillis();
+    private final long lastQueueDate = System.currentTimeMillis();
+    private final Map<String, String> attributes;
+    private final long size;
+    private final ContentClaim contentClaim;
+
+    public MockFlowFile(final Map<String, String> attributes, final long size, final ResourceClaimManager claimManager) {
+        this(attributes, size, createContentClaim(String.valueOf(idGenerator.get()), claimManager));
+    }
+
+    public MockFlowFile(final Map<String, String> attributes, final long size, final ContentClaim contentClaim) {
+        this(idGenerator.getAndIncrement(), attributes, size, contentClaim);
+    }
+
+    public MockFlowFile(final long id, final Map<String, String> attributes, final long size, final ContentClaim contentClaim) {
+        this.id = id;
+        this.attributes = new HashMap<>(attributes);
+        this.size = size;
+        this.contentClaim = contentClaim;
+    }
+
+    public static ContentClaim createContentClaim(final String id, final ResourceClaimManager claimManager) {
+        final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", id, false, false);
+        claimManager.incrementClaimantCount(resourceClaim);
+        return new StandardContentClaim(resourceClaim, 3L);
+    }
+
+    @Override
+    public long getId() {
+        return id;
+    }
+
+    @Override
+    public long getEntryDate() {
+        return entryDate;
+    }
+
+    @Override
+    public long getLineageStartDate() {
+        return entryDate;
+    }
+
+    @Override
+    public Long getLastQueueDate() {
+        return lastQueueDate;
+    }
+
+    @Override
+    public boolean isPenalized() {
+        return false;
+    }
+
+    @Override
+    public String getAttribute(String key) {
+        return attributes.get(key);
+    }
+
+    @Override
+    public long getSize() {
+        return size;
+    }
+
+    @Override
+    public Map<String, String> getAttributes() {
+        return Collections.unmodifiableMap(attributes);
+    }
+
+    @Override
+    public int compareTo(final FlowFile o) {
+        return Long.compare(id, o.getId());
+    }
+
+    @Override
+    public long getPenaltyExpirationMillis() {
+        return -1L;
+    }
+
+    @Override
+    public ContentClaim getContentClaim() {
+        return contentClaim;
+    }
+
+    @Override
+    public long getContentClaimOffset() {
+        return 1;
+    }
+
+    @Override
+    public long getLineageStartIndex() {
+        return 0;
+    }
+
+    @Override
+    public long getQueueDateIndex() {
+        return 0;
+    }
+
+    public static void resetIdGenerator() {
+        idGenerator.set(0L);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSchemaSwapSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSchemaSwapSerializerDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSchemaSwapSerializerDeserializer.java
new file mode 100644
index 0000000..8565f38
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSchemaSwapSerializerDeserializer.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestSchemaSwapSerializerDeserializer {
+
+    @Before
+    public void setup() {
+        MockFlowFile.resetIdGenerator();
+    }
+
+    @Test
+    public void testRoundTripSerializeDeserializeSummary() throws IOException {
+        final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+        final ResourceClaim firstResourceClaim = resourceClaimManager.newResourceClaim("container", "section", "id", true, false);
+        resourceClaimManager.incrementClaimantCount(firstResourceClaim);
+
+        final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+        final Map<String, String> attrs = new HashMap<>();
+        long size = 0L;
+        final ContentClaim firstClaim = MockFlowFile.createContentClaim("id", resourceClaimManager);
+        for (int i = 0; i < 10000; i++) {
+            attrs.put("i", String.valueOf(i));
+            final FlowFileRecord ff = i < 2 ? new MockFlowFile(attrs, i, firstClaim) : new MockFlowFile(attrs, i, resourceClaimManager);
+            toSwap.add(ff);
+            size += i;
+        }
+
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+        final String swapLocation = "target/testRoundTrip.swap";
+        final File swapFile = new File(swapLocation);
+        Files.deleteIfExists(swapFile.toPath());
+
+        final SwapSerializer serializer = new SchemaSwapSerializer();
+        try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
+            serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+        }
+
+        final SwapDeserializer deserializer = new SchemaSwapDeserializer();
+        final SwapSummary swapSummary;
+        try (final FileInputStream fis = new FileInputStream(swapFile);
+            final DataInputStream dis = new DataInputStream(fis)) {
+
+            swapSummary = deserializer.getSwapSummary(dis, swapLocation, resourceClaimManager);
+        }
+
+        assertEquals(10000, swapSummary.getQueueSize().getObjectCount());
+        assertEquals(size, swapSummary.getQueueSize().getByteCount());
+        assertEquals(9999, swapSummary.getMaxFlowFileId().intValue());
+
+        final List<ResourceClaim> resourceClaims = swapSummary.getResourceClaims();
+        assertEquals(10000, resourceClaims.size());
+        assertFalse(resourceClaims.stream().anyMatch(claim -> claim == null));
+        assertEquals(2, resourceClaims.stream().filter(claim -> claim.getId().equals("id")).collect(Collectors.counting()).intValue());
+
+        final Set<ResourceClaim> uniqueClaims = new HashSet<>(resourceClaims);
+        assertEquals(9999, uniqueClaims.size());
+    }
+
+    @Test
+    public void testRoundTripSerializeDeserializeFullSwapFile() throws IOException, InterruptedException {
+        final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+
+        final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+        final Map<String, String> attrs = new HashMap<>();
+        long size = 0L;
+        for (int i = 0; i < 10000; i++) {
+            attrs.put("i", String.valueOf(i));
+            final FlowFileRecord ff = new MockFlowFile(attrs, i, resourceClaimManager);
+            toSwap.add(ff);
+            size += i;
+        }
+
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+        final String swapLocation = "target/testRoundTrip.swap";
+        final File swapFile = new File(swapLocation);
+        Files.deleteIfExists(swapFile.toPath());
+
+        final SwapSerializer serializer = new SchemaSwapSerializer();
+        try (final OutputStream fos = new FileOutputStream(swapFile);
+            final OutputStream out = new BufferedOutputStream(fos)) {
+            serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out);
+        }
+
+        final SwapContents contents;
+        final SwapDeserializer deserializer = new SchemaSwapDeserializer();
+        try (final FileInputStream fis = new FileInputStream(swapFile);
+            final InputStream bufferedIn = new BufferedInputStream(fis);
+            final DataInputStream dis = new DataInputStream(bufferedIn)) {
+
+            contents = deserializer.deserializeFlowFiles(dis, swapLocation, flowFileQueue, resourceClaimManager);
+        }
+
+        final SwapSummary swapSummary = contents.getSummary();
+        assertEquals(10000, swapSummary.getQueueSize().getObjectCount());
+        assertEquals(size, swapSummary.getQueueSize().getByteCount());
+        assertEquals(9999, swapSummary.getMaxFlowFileId().intValue());
+
+        assertEquals(10000, contents.getFlowFiles().size());
+
+        int counter = 0;
+        for (final FlowFileRecord flowFile : contents.getFlowFiles()) {
+            final int i = counter++;
+            assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
+            assertEquals(i, flowFile.getSize());
+        }
+    }
+
+    @Test
+    @Ignore("For manual testing, in order to ensure that changes do not negatively impact performance")
+    public void testWritePerformance() throws IOException, InterruptedException {
+        final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+
+        final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+        final Map<String, String> attrs = new HashMap<>();
+        for (int i = 0; i < 10000; i++) {
+            attrs.put("i", String.valueOf(i));
+            final FlowFileRecord ff = new MockFlowFile(attrs, i, resourceClaimManager);
+            toSwap.add(ff);
+        }
+
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+        final String swapLocation = "target/testRoundTrip.swap";
+
+        final int iterations = 1000;
+
+        final long start = System.nanoTime();
+        final SwapSerializer serializer = new SchemaSwapSerializer();
+        for (int i = 0; i < iterations; i++) {
+            try (final OutputStream out = new NullOutputStream()) {
+                serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out);
+            }
+        }
+
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        System.out.println("Wrote " + iterations + " Swap Files in " + millis + " millis");
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSimpleSwapSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSimpleSwapSerializerDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSimpleSwapSerializerDeserializer.java
new file mode 100644
index 0000000..0458333
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/swap/TestSimpleSwapSerializerDeserializer.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestSimpleSwapSerializerDeserializer {
+    @Before
+    public void setup() {
+        MockFlowFile.resetIdGenerator();
+    }
+
+    @Test
+    public void testRoundTripSerializeDeserialize() throws IOException {
+        final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+
+        final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+        final Map<String, String> attrs = new HashMap<>();
+        for (int i = 0; i < 10000; i++) {
+            attrs.put("i", String.valueOf(i));
+            final FlowFileRecord ff = new MockFlowFile(attrs, i, resourceClaimManager);
+            toSwap.add(ff);
+        }
+
+        final String queueId = "87bb99fe-412c-49f6-a441-d1b0af4e20b4";
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        Mockito.when(flowFileQueue.getIdentifier()).thenReturn(queueId);
+
+        final String swapLocation = "target/testRoundTrip-" + queueId + ".swap";
+        final File swapFile = new File(swapLocation);
+
+        Files.deleteIfExists(swapFile.toPath());
+        try {
+            final SimpleSwapSerializer serializer = new SimpleSwapSerializer();
+            try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
+                serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+            }
+
+            final SimpleSwapDeserializer deserializer = new SimpleSwapDeserializer();
+            final SwapContents swappedIn;
+            try (final FileInputStream fis = new FileInputStream(swapFile);
+                final DataInputStream dis = new DataInputStream(fis)) {
+                swappedIn = deserializer.deserializeFlowFiles(dis, swapLocation, flowFileQueue, resourceClaimManager);
+            }
+
+            assertEquals(toSwap.size(), swappedIn.getFlowFiles().size());
+            for (int i = 0; i < toSwap.size(); i++) {
+                final FlowFileRecord pre = toSwap.get(i);
+                final FlowFileRecord post = swappedIn.getFlowFiles().get(i);
+
+                assertEquals(pre.getSize(), post.getSize());
+                assertEquals(pre.getAttributes(), post.getAttributes());
+                assertEquals(pre.getSize(), post.getSize());
+                assertEquals(pre.getId(), post.getId());
+                assertEquals(pre.getContentClaim(), post.getContentClaim());
+                assertEquals(pre.getContentClaimOffset(), post.getContentClaimOffset());
+                assertEquals(pre.getEntryDate(), post.getEntryDate());
+                assertEquals(pre.getLastQueueDate(), post.getLastQueueDate());
+                assertEquals(pre.getLineageStartDate(), post.getLineageStartDate());
+                assertEquals(pre.getPenaltyExpirationMillis(), post.getPenaltyExpirationMillis());
+            }
+        } finally {
+            Files.deleteIfExists(swapFile.toPath());
+        }
+    }
+
+    @Test
+    @Ignore("For manual testing only. Not intended to be run as part of the automated unit tests but can "
+        + "be convenient for determining a baseline for performance if making modifications.")
+    public void testWritePerformance() throws IOException, InterruptedException {
+        final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
+
+        final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+        final Map<String, String> attrs = new HashMap<>();
+        for (int i = 0; i < 10000; i++) {
+            attrs.put("i", String.valueOf(i));
+            final FlowFileRecord ff = new MockFlowFile(attrs, i, resourceClaimManager);
+            toSwap.add(ff);
+        }
+
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+        final String swapLocation = "target/testRoundTrip.swap";
+
+        final int iterations = 1000;
+
+        final long start = System.nanoTime();
+        final SwapSerializer serializer = new SimpleSwapSerializer();
+        for (int i = 0; i < iterations; i++) {
+            try (final OutputStream out = new NullOutputStream()) {
+                serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out);
+            }
+        }
+
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        System.out.println("Wrote " + iterations + " Swap Files in " + millis + " millis");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
index 47e7bcd..52e53a8 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
@@ -36,6 +36,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java
new file mode 100644
index 0000000..2c84861
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractRecordWriter implements RecordWriter {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractRecordWriter.class);
+
+    private final File file;
+    private final TocWriter tocWriter;
+    private final Lock lock = new ReentrantLock();
+
+    private volatile boolean dirty = false;
+    private volatile boolean closed = false;
+
+    private int recordsWritten = 0;
+
+    public AbstractRecordWriter(final File file, final TocWriter writer) throws IOException {
+        logger.trace("Creating Record Writer for {}", file);
+
+        this.file = file;
+        this.tocWriter = writer;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        closed = true;
+
+        logger.trace("Closing Record Writer for {}", file == null ? null : file.getName());
+
+        lock();
+        try {
+            flush();
+
+            try {
+                // We want to close 'out' only if the writer is not 'dirty'.
+                // If the writer is dirty, then there was a failure to write
+                // to disk, which means that we likely have a partial record written
+                // to disk.
+                //
+                // If we call close() on out, it will in turn call flush() on the underlying
+                // output stream, which is a BufferedOutputStream. As a result, we will end
+                // up flushing the buffer after a partially written record, which results in
+                // essentially random bytes being written to the repository, which causes
+                // corruption and un-recoverability. Since we will close the underlying 'rawOutStream'
+                // below, we will still appropriately clean up the resources help by this writer, so
+                // we are still OK in terms of closing all resources held by the writer.
+                final OutputStream buffered = getBufferedOutputStream();
+                if (buffered != null && !isDirty()) {
+                    buffered.close();
+                }
+            } finally {
+                final OutputStream underlying = getUnderlyingOutputStream();
+                if (underlying != null) {
+                    try {
+                        getUnderlyingOutputStream().close();
+                    } finally {
+                        if (tocWriter != null) {
+                            tocWriter.close();
+                        }
+                    }
+                }
+            }
+        } catch (final IOException ioe) {
+            markDirty();
+            throw ioe;
+        } finally {
+            unlock();
+        }
+    }
+
+    @Override
+    public int getRecordsWritten() {
+        return recordsWritten;
+    }
+
+    @Override
+    public File getFile() {
+        return file;
+    }
+
+    @Override
+    public void lock() {
+        lock.lock();
+    }
+
+    @Override
+    public void unlock() {
+        lock.unlock();
+    }
+
+    @Override
+    public boolean tryLock() {
+        final boolean obtainedLock = lock.tryLock();
+        if (obtainedLock && isDirty()) {
+            // once we have obtained the lock, we need to check if the writer
+            // has been marked dirty. If so, we cannot write to the underlying
+            // file, so we need to unlock and return false. Otherwise, it's okay
+            // to write to the underlying file, so return true.
+            lock.unlock();
+            return false;
+        }
+        return obtainedLock;
+    }
+
+    @Override
+    public void markDirty() {
+        this.dirty = true;
+    }
+
+    public boolean isDirty() {
+        return dirty;
+    }
+
+    protected void resetDirtyFlag() {
+        this.dirty = false;
+    }
+
+    @Override
+    public void sync() throws IOException {
+        try {
+            if (tocWriter != null) {
+                tocWriter.sync();
+            }
+
+            syncUnderlyingOutputStream();
+        } catch (final IOException ioe) {
+            markDirty();
+            throw ioe;
+        }
+    }
+
+    @Override
+    public TocWriter getTocWriter() {
+        return tocWriter;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+
+    protected abstract OutputStream getBufferedOutputStream();
+
+    protected abstract OutputStream getUnderlyingOutputStream();
+
+    protected abstract void syncUnderlyingOutputStream() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java
new file mode 100644
index 0000000..297f084
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.provenance.schema.EventRecord;
+import org.apache.nifi.provenance.serialization.CompressableRecordReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.SchemaRecordReader;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+
+
+public class ByteArraySchemaRecordReader extends CompressableRecordReader {
+    private RecordSchema schema; // effectively final
+    private SchemaRecordReader recordReader;  // effectively final
+
+    public ByteArraySchemaRecordReader(final InputStream in, final String filename, final int maxAttributeChars) throws IOException {
+        super(in, filename, maxAttributeChars);
+    }
+
+    public ByteArraySchemaRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException {
+        super(in, filename, tocReader, maxAttributeChars);
+    }
+
+    private void verifySerializationVersion(final int serializationVersion) {
+        if (serializationVersion > ByteArraySchemaRecordWriter.SERIALIZATION_VERSION) {
+            throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion
+                + " and supported versions are 1-" + ByteArraySchemaRecordWriter.SERIALIZATION_VERSION);
+        }
+    }
+
+    @Override
+    protected void readHeader(final DataInputStream in, final int serializationVersion) throws IOException {
+        verifySerializationVersion(serializationVersion);
+        final int schemaLength = in.readInt();
+        final byte[] buffer = new byte[schemaLength];
+        StreamUtils.fillBuffer(in, buffer);
+
+        try (final ByteArrayInputStream bais = new ByteArrayInputStream(buffer)) {
+            schema = RecordSchema.readFrom(bais);
+        }
+
+        recordReader = SchemaRecordReader.fromSchema(schema);
+    }
+
+    @Override
+    protected StandardProvenanceEventRecord nextRecord(final DataInputStream in, final int serializationVersion) throws IOException {
+        verifySerializationVersion(serializationVersion);
+        final long byteOffset = getBytesConsumed();
+        final int recordLength = in.readInt();
+
+        final InputStream limitedIn = new LimitingInputStream(in, recordLength);
+        final Record eventRecord = recordReader.readRecord(limitedIn);
+
+        return EventRecord.getEvent(eventRecord, getFilename(), byteOffset, getMaxAttributeLength());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java
new file mode 100644
index 0000000..cae2f40
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.provenance.schema.EventRecord;
+import org.apache.nifi.provenance.schema.EventRecordFields;
+import org.apache.nifi.provenance.schema.ProvenanceEventSchema;
+import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.SchemaRecordWriter;
+import org.apache.nifi.stream.io.DataOutputStream;
+
+public class ByteArraySchemaRecordWriter extends CompressableRecordWriter {
+    private static final RecordSchema eventSchema = ProvenanceEventSchema.PROVENANCE_EVENT_SCHEMA_V1;
+    private static final RecordSchema contentClaimSchema = new RecordSchema(eventSchema.getField(EventRecordFields.Names.CONTENT_CLAIM).getSubFields());
+    public static final int SERIALIZATION_VERSION = 1;
+    public static final String SERIALIZATION_NAME = "ByteArraySchemaRecordWriter";
+
+    private final SchemaRecordWriter recordWriter = new SchemaRecordWriter();
+
+    public ByteArraySchemaRecordWriter(final File file, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+        super(file, tocWriter, compressed, uncompressedBlockSize);
+    }
+
+    public ByteArraySchemaRecordWriter(final OutputStream out, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+        super(out, tocWriter, compressed, uncompressedBlockSize);
+    }
+
+    @Override
+    protected String getSerializationName() {
+        return SERIALIZATION_NAME;
+    }
+
+    @Override
+    protected int getSerializationVersion() {
+        return SERIALIZATION_VERSION;
+    }
+
+    @Override
+    public void writeHeader(final long firstEventId, final DataOutputStream out) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        eventSchema.writeTo(baos);
+
+        out.writeInt(baos.size());
+        baos.writeTo(out);
+    }
+
+    protected Record createRecord(final ProvenanceEventRecord event, final long eventId) {
+        return new EventRecord(event, eventId, eventSchema, contentClaimSchema);
+    }
+
+    @Override
+    protected void writeRecord(final ProvenanceEventRecord event, final long eventId, final DataOutputStream out) throws IOException {
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(256)) {
+
+            final Record eventRecord = createRecord(event, eventId);
+            recordWriter.writeRecord(eventRecord, baos);
+
+            out.writeInt(baos.size());
+            baos.writeTo(out);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index f70bf7d..c20ce6e 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -110,6 +110,11 @@ import org.apache.nifi.util.RingBuffer;
 import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.Tuple;
+import org.apache.nifi.util.timebuffer.CountSizeEntityAccess;
+import org.apache.nifi.util.timebuffer.LongEntityAccess;
+import org.apache.nifi.util.timebuffer.TimedBuffer;
+import org.apache.nifi.util.timebuffer.TimedCountSize;
+import org.apache.nifi.util.timebuffer.TimestampedLong;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -120,7 +125,6 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
     private static final String FILE_EXTENSION = ".prov";
     private static final String TEMP_FILE_SUFFIX = ".prov.part";
     private static final long PURGE_EVENT_MILLISECONDS = 2500L; //Determines the frequency over which the task to delete old events will occur
-    public static final int SERIALIZATION_VERSION = 9;
     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
     public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
     public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov");
@@ -180,6 +184,9 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
     private Authorizer authorizer;  // effectively final
     private ProvenanceAuthorizableFactory resourceFactory;  // effectively final
 
+    private final TimedBuffer<TimedCountSize> updateCounts = new TimedBuffer<>(TimeUnit.SECONDS, 300, new CountSizeEntityAccess());
+    private final TimedBuffer<TimestampedLong> backpressurePauseMillis = new TimedBuffer<>(TimeUnit.SECONDS, 300, new LongEntityAccess());
+
     /**
      * default no args constructor for service loading only.
      */
@@ -401,7 +408,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
             final File journalDirectory = new File(storageDirectory, "journals");
             final File journalFile = new File(journalDirectory, String.valueOf(initialRecordId) + ".journal." + i);
 
-            writers[i] = RecordWriters.newRecordWriter(journalFile, false, false);
+            writers[i] = RecordWriters.newSchemaRecordWriter(journalFile, false, false);
             writers[i].writeHeader(initialRecordId);
         }
 
@@ -762,18 +769,23 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
 
             try {
                 try {
+                    long recordsWritten = 0L;
                     for (final ProvenanceEventRecord nextRecord : records) {
                         final long eventId = idGenerator.getAndIncrement();
                         bytesWritten += writer.writeRecord(nextRecord, eventId);
+                        recordsWritten++;
                         logger.trace("Wrote record with ID {} to {}", eventId, writer);
                     }
 
+                    writer.flush();
+
                     if (alwaysSync) {
                         writer.sync();
                     }
 
                     totalJournalSize = bytesWrittenSinceRollover.addAndGet(bytesWritten);
                     recordsWrittenSinceRollover.getAndIncrement();
+                    this.updateCounts.add(new TimedCountSize(recordsWritten, bytesWritten));
                 } catch (final Throwable t) {
                     // We need to set the repoDirty flag before we release the lock for this journal.
                     // Otherwise, another thread may write to this journal -- this is a problem because
@@ -1331,14 +1343,17 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
                                     updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
                                 }
 
-                                logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
+                                final TimedCountSize countSize = updateCounts.getAggregateValue(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES));
+                                logger.info("Successfully Rolled over Provenance Event file containing {} records. In the past 5 minutes, "
+                                    + "{} events have been written to the Provenance Repository, totaling {}",
+                                    recordsWritten, countSize.getCount(), FormatUtils.formatDataSize(countSize.getSize()));
                             }
 
                             //if files were rolled over or if out of retries stop the future
                             if (fileRolledOver != null || retryAttempts.decrementAndGet() == 0) {
 
                                 if (fileRolledOver == null && retryAttempts.get() == 0) {
-                                    logger.error("Failed to merge Journal Files {} after {} attempts. ", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
+                                    logger.error("Failed to merge Journal Files {} after {} attempts.", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
                                 }
 
                                 rolloverCompletions.getAndIncrement();
@@ -1387,6 +1402,8 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
             // max capacity for the repo, or if we have 5 sets of journal files waiting to be merged, we will block here until
             // that is no longer the case.
             if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
+                final long stopTheWorldStart = System.nanoTime();
+
                 logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
                         + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and "
                         + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
@@ -1428,8 +1445,12 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
                     repoSize = getSize(getLogFiles(), 0L);
                 }
 
+                final long stopTheWorldNanos = System.nanoTime() - stopTheWorldStart;
+                backpressurePauseMillis.add(new TimestampedLong(stopTheWorldNanos));
+                final TimestampedLong pauseNanosLastFiveMinutes = backpressurePauseMillis.getAggregateValue(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES));
                 logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of "
-                        + "journal files to be rolled over is {}", journalFileCount);
+                    + "journal files to be rolled over is {}. Provenance Repository Back Pressure paused Session commits for {} ({} total in the last 5 minutes).",
+                    journalFileCount, FormatUtils.formatNanos(stopTheWorldNanos, true), FormatUtils.formatNanos(pauseNanosLastFiveMinutes.getValue(), true));
             }
 
             // we've finished rolling over successfully. Create new writers and reset state.
@@ -1635,7 +1656,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
                     }
 
                     if (eventReporter != null) {
-                        eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "re " + ioe.toString());
+                        eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to merge Journal Files due to " + ioe.toString());
                     }
                 }
             }
@@ -1696,7 +1717,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
 
             // loop over each entry in the map, persisting the records to the merged file in order, and populating the map
             // with the next entry from the journal file from which the previous record was written.
-            try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
+            try (final RecordWriter writer = RecordWriters.newSchemaRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
                 writer.writeHeader(minEventId);
 
                 final IndexingAction indexingAction = createIndexingAction();
@@ -1903,14 +1924,23 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
     private StandardProvenanceEventRecord truncateAttributes(final StandardProvenanceEventRecord original) {
         boolean requireTruncation = false;
 
-        for (final Map.Entry<String, String> entry : original.getAttributes().entrySet()) {
-            if (entry.getValue().length() > maxAttributeChars) {
+        for (final String updatedAttr : original.getUpdatedAttributes().values()) {
+            if (updatedAttr != null && updatedAttr.length() > maxAttributeChars) {
                 requireTruncation = true;
                 break;
             }
         }
 
         if (!requireTruncation) {
+            for (final String previousAttr : original.getPreviousAttributes().values()) {
+                if (previousAttr != null && previousAttr.length() > maxAttributeChars) {
+                    requireTruncation = true;
+                    break;
+                }
+            }
+        }
+
+        if (!requireTruncation) {
             return original;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index 2db9ed3..f018685 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -17,178 +17,36 @@
 package org.apache.nifi.provenance;
 
 import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.regex.Pattern;
-import java.util.zip.GZIPInputStream;
 
-import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.CompressableRecordReader;
 import org.apache.nifi.provenance.toc.TocReader;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StandardRecordReader implements RecordReader {
+public class StandardRecordReader extends CompressableRecordReader {
     private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
     private static final Pattern UUID_PATTERN = Pattern.compile("[a-fA-F0-9]{8}\\-([a-fA-F0-9]{4}\\-){3}[a-fA-F0-9]{12}");
 
-    private final ByteCountingInputStream rawInputStream;
-    private final String filename;
-    private final int serializationVersion;
-    private final boolean compressed;
-    private final TocReader tocReader;
-    private final int headerLength;
-    private final int maxAttributeChars;
-
-    private DataInputStream dis;
-    private ByteCountingInputStream byteCountingIn;
-
     public StandardRecordReader(final InputStream in, final String filename, final int maxAttributeChars) throws IOException {
         this(in, filename, null, maxAttributeChars);
     }
 
     public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException {
+        super(in, filename, tocReader, maxAttributeChars);
         logger.trace("Creating RecordReader for {}", filename);
-
-        rawInputStream = new ByteCountingInputStream(in);
-        this.maxAttributeChars = maxAttributeChars;
-
-        final InputStream limitedStream;
-        if ( tocReader == null ) {
-            limitedStream = rawInputStream;
-        } else {
-            final long offset1 = tocReader.getBlockOffset(1);
-            if ( offset1 < 0 ) {
-                limitedStream = rawInputStream;
-            } else {
-                limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
-            }
-        }
-
-        final InputStream readableStream;
-        if (filename.endsWith(".gz")) {
-            readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
-            compressed = true;
-        } else {
-            readableStream = new BufferedInputStream(limitedStream);
-            compressed = false;
-        }
-
-        byteCountingIn = new ByteCountingInputStream(readableStream);
-        dis = new DataInputStream(byteCountingIn);
-
-        final String repoClassName = dis.readUTF();
-        final int serializationVersion = dis.readInt();
-        headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer.
-
-        if (serializationVersion < 1 || serializationVersion > 9) {
-            throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-9");
-        }
-
-        this.serializationVersion = serializationVersion;
-        this.filename = filename;
-        this.tocReader = tocReader;
-    }
-
-    @Override
-    public void skipToBlock(final int blockIndex) throws IOException {
-        if ( tocReader == null ) {
-            throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
-        }
-
-        if ( blockIndex < 0 ) {
-            throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
-        }
-
-        if ( blockIndex == getBlockIndex() ) {
-            return;
-        }
-
-        final long offset = tocReader.getBlockOffset(blockIndex);
-        if ( offset < 0 ) {
-            throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
-        }
-
-        final long curOffset = rawInputStream.getBytesConsumed();
-
-        final long bytesToSkip = offset - curOffset;
-        if ( bytesToSkip >= 0 ) {
-            try {
-                StreamUtils.skip(rawInputStream, bytesToSkip);
-                logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
-            } catch (final IOException e) {
-                throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
-            }
-
-            resetStreamForNextBlock();
-        }
-    }
-
-    private void resetStreamForNextBlock() throws IOException {
-        final InputStream limitedStream;
-        if ( tocReader == null ) {
-            limitedStream = rawInputStream;
-        } else {
-            final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
-            if ( offset < 0 ) {
-                limitedStream = rawInputStream;
-            } else {
-                limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
-            }
-        }
-
-        final InputStream readableStream;
-        if (compressed) {
-            readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
-        } else {
-            readableStream = new BufferedInputStream(limitedStream);
-        }
-
-        byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed());
-        dis = new DataInputStream(byteCountingIn);
-    }
-
-
-    @Override
-    public TocReader getTocReader() {
-        return tocReader;
-    }
-
-    @Override
-    public boolean isBlockIndexAvailable() {
-        return tocReader != null;
-    }
-
-    @Override
-    public int getBlockIndex() {
-        if ( tocReader == null ) {
-            throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
-        }
-
-        return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
     }
 
-    @Override
-    public long getBytesConsumed() {
-        return byteCountingIn.getBytesConsumed();
-    }
-
-    private StandardProvenanceEventRecord readPreVersion6Record() throws IOException {
-        final long startOffset = byteCountingIn.getBytesConsumed();
-
-        if (!isData()) {
-            return null;
-        }
 
+    private StandardProvenanceEventRecord readPreVersion6Record(final DataInputStream dis, final int serializationVersion) throws IOException {
+        final long startOffset = getBytesConsumed();
         final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
 
         final long eventId = dis.readLong();
@@ -254,7 +112,7 @@ public class StandardRecordReader implements RecordReader {
         builder.setAttributes(Collections.<String, String>emptyMap(), attrs);
         builder.setCurrentContentClaim(null, null, null, null, fileSize);
 
-        builder.setStorageLocation(filename, startOffset);
+        builder.setStorageLocation(getFilename(), startOffset);
 
         final StandardProvenanceEventRecord record = builder.build();
         record.setEventId(eventId);
@@ -262,17 +120,18 @@ public class StandardRecordReader implements RecordReader {
     }
 
     @Override
-    public StandardProvenanceEventRecord nextRecord() throws IOException {
+    public StandardProvenanceEventRecord nextRecord(final DataInputStream dis, final int serializationVersion) throws IOException {
+        if (serializationVersion > StandardRecordWriter.SERIALIZATION_VERISON) {
+            throw new IllegalArgumentException("Unable to deserialize record because the version is "
+                + serializationVersion + " and supported versions are 1-" + StandardRecordWriter.SERIALIZATION_VERISON);
+        }
+
         // Schema changed drastically in version 6 so we created a new method to handle old records
         if (serializationVersion < 6) {
-            return readPreVersion6Record();
+            return readPreVersion6Record(dis, serializationVersion);
         }
 
-        final long startOffset = byteCountingIn.getBytesConsumed();
-
-        if (!isData()) {
-            return null;
-        }
+        final long startOffset = getBytesConsumed();
 
         final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
 
@@ -288,7 +147,7 @@ public class StandardRecordReader implements RecordReader {
         if (serializationVersion < 9){
             final int numLineageIdentifiers = dis.readInt();
             for (int i = 0; i < numLineageIdentifiers; i++) {
-                readUUID(dis); //skip identifiers
+                readUUID(dis, serializationVersion); //skip identifiers
             }
         }
 
@@ -303,7 +162,7 @@ public class StandardRecordReader implements RecordReader {
         builder.setComponentId(readNullableString(dis));
         builder.setComponentType(readNullableString(dis));
 
-        final String uuid = readUUID(dis);
+        final String uuid = readUUID(dis, serializationVersion);
         builder.setFlowFileUUID(uuid);
         builder.setDetails(readNullableString(dis));
 
@@ -335,12 +194,12 @@ public class StandardRecordReader implements RecordReader {
         if (eventType == ProvenanceEventType.FORK || eventType == ProvenanceEventType.JOIN || eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.REPLAY) {
             final int numParents = dis.readInt();
             for (int i = 0; i < numParents; i++) {
-                builder.addParentUuid(readUUID(dis));
+                builder.addParentUuid(readUUID(dis, serializationVersion));
             }
 
             final int numChildren = dis.readInt();
             for (int i = 0; i < numChildren; i++) {
-                builder.addChildUuid(readUUID(dis));
+                builder.addChildUuid(readUUID(dis, serializationVersion));
             }
         } else if (eventType == ProvenanceEventType.RECEIVE) {
             builder.setTransitUri(readNullableString(dis));
@@ -357,7 +216,7 @@ public class StandardRecordReader implements RecordReader {
 
         builder.setFlowFileEntryDate(flowFileEntryDate);
         builder.setLineageStartDate(lineageStartDate);
-        builder.setStorageLocation(filename, startOffset);
+        builder.setStorageLocation(getFilename(), startOffset);
 
         final StandardProvenanceEventRecord record = builder.build();
         record.setEventId(eventId);
@@ -373,8 +232,8 @@ public class StandardRecordReader implements RecordReader {
             final String truncatedValue;
             if (value == null) {
                 truncatedValue = null;
-            } else if (value.length() > maxAttributeChars) {
-                truncatedValue = value.substring(0, maxAttributeChars);
+            } else if (value.length() > getMaxAttributeLength()) {
+                truncatedValue = value.substring(0, getMaxAttributeLength());
             } else {
                 truncatedValue = value;
             }
@@ -385,8 +244,8 @@ public class StandardRecordReader implements RecordReader {
         return attrs;
     }
 
-    private String readUUID(final DataInputStream in) throws IOException {
-        if ( serializationVersion < 8 ) {
+    private String readUUID(final DataInputStream in, final int serializationVersion) throws IOException {
+        if (serializationVersion < 8) {
             final long msb = in.readLong();
             final long lsb = in.readLong();
             return new UUID(msb, lsb).toString();
@@ -427,80 +286,4 @@ public class StandardRecordReader implements RecordReader {
         StreamUtils.fillBuffer(in, strBytes);
         return new String(strBytes, "UTF-8");
     }
-
-    private boolean isData() throws IOException {
-        byteCountingIn.mark(1);
-        int nextByte = byteCountingIn.read();
-        byteCountingIn.reset();
-
-        if ( nextByte < 0 ) {
-            try {
-                resetStreamForNextBlock();
-            } catch (final EOFException eof) {
-                return false;
-            }
-
-            byteCountingIn.mark(1);
-            nextByte = byteCountingIn.read();
-            byteCountingIn.reset();
-        }
-
-        return nextByte >= 0;
-    }
-
-    @Override
-    public long getMaxEventId() throws IOException {
-        if ( tocReader != null ) {
-            final long lastBlockOffset = tocReader.getLastBlockOffset();
-            skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
-        }
-
-        ProvenanceEventRecord record;
-        ProvenanceEventRecord lastRecord = null;
-        try {
-            while ((record = nextRecord()) != null) {
-                lastRecord = record;
-            }
-        } catch (final EOFException eof) {
-            // This can happen if we stop NIFi while the record is being written.
-            // This is OK, we just ignore this record. The session will not have been
-            // committed, so we can just process the FlowFile again.
-        }
-
-        return lastRecord == null ? -1L : lastRecord.getEventId();
-    }
-
-    @Override
-    public void close() throws IOException {
-        logger.trace("Closing Record Reader for {}", filename);
-
-        dis.close();
-        rawInputStream.close();
-
-        if ( tocReader != null ) {
-            tocReader.close();
-        }
-    }
-
-    @Override
-    public void skip(final long bytesToSkip) throws IOException {
-        StreamUtils.skip(dis, bytesToSkip);
-    }
-
-    @Override
-    public void skipTo(final long position) throws IOException {
-        // we are subtracting headerLength from the number of bytes consumed because we used to
-        // consider the offset of the first record "0" - now we consider it whatever position it
-        // it really is in the stream.
-        final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength;
-        if (currentPosition == position) {
-            return;
-        }
-        if (currentPosition > position) {
-            throw new IOException("Cannot skip to byte offset " + position + " in stream because already at byte offset " + currentPosition);
-        }
-
-        final long toSkip = position - currentPosition;
-        StreamUtils.skip(dis, toSkip);
-    }
 }