You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/18 19:54:05 UTC

[5/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
new file mode 100644
index 0000000..916fd76
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.schema.ContentClaimFieldMap;
+import org.apache.nifi.controller.repository.schema.ContentClaimSchema;
+import org.apache.nifi.controller.repository.schema.FlowFileSchema;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SchemaRecordReader;
+import org.apache.nifi.repository.schema.SchemaRecordWriter;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
+    private static final int MAX_ENCODING_VERSION = 1;
+
+    private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1;
+    private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1;
+
+    private final ResourceClaimManager resourceClaimManager;
+    private volatile RecordSchema recoverySchema;
+
+    public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) {
+        this.resourceClaimManager = resourceClaimManager;
+    }
+
+    @Override
+    public void writeHeader(final DataOutputStream out) throws IOException {
+        writeSchema.writeTo(out);
+    }
+
+    @Override
+    public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord newRecordState, final DataOutputStream out) throws IOException {
+        serializeRecord(newRecordState, out);
+    }
+
+    @Override
+    public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException {
+        final RecordSchema schema;
+        switch (record.getType()) {
+            case CREATE:
+            case UPDATE:
+                schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1;
+                break;
+            case CONTENTMISSING:
+            case DELETE:
+                schema = RepositoryRecordSchema.DELETE_SCHEMA_V1;
+                break;
+            case SWAP_IN:
+                schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V1;
+                break;
+            case SWAP_OUT:
+                schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V1;
+                break;
+            default:
+                throw new IllegalArgumentException("Received Repository Record with unknown Update Type: " + record.getType()); // won't happen.
+        }
+
+        final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema);
+        final RepositoryRecordUpdate update = new RepositoryRecordUpdate(fieldMap, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        new SchemaRecordWriter().writeRecord(update, out);
+    }
+
+    @Override
+    public void readHeader(final DataInputStream in) throws IOException {
+        recoverySchema = RecordSchema.readFrom(in);
+    }
+
+    @Override
+    public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
+        return deserializeRecord(in, version);
+    }
+
+    @Override
+    public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
+        final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema);
+        final Record updateRecord = reader.readRecord(in);
+
+        // Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the
+        // top level that indicates which type of record we have.
+        final Record record = (Record) updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V1);
+
+        final String actionType = (String) record.getFieldValue(RepositoryRecordSchema.ACTION_TYPE_FIELD);
+        final UpdateType updateType = UpdateType.valueOf(actionType);
+        switch (updateType) {
+            case CREATE:
+                return createRecord(record);
+            case DELETE:
+                return deleteRecord(record);
+            case SWAP_IN:
+                return swapInRecord(record);
+            case SWAP_OUT:
+                return swapOutRecord(record);
+            case UPDATE:
+                return updateRecord(record);
+            default:
+                throw new IOException("Found unrecognized Update Type '" + actionType + "'");
+        }
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private StandardRepositoryRecord createRecord(final Record record) {
+        final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+        ffBuilder.id((Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID));
+        ffBuilder.entryDate((Long) record.getFieldValue(FlowFileSchema.ENTRY_DATE));
+
+        final Long lastQueueDate = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE);
+        final Long queueDateIndex = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX);
+        ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
+
+        final Long lineageStartDate = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE);
+        final Long lineageStartIndex = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX);
+        ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
+
+        populateContentClaim(ffBuilder, record);
+        ffBuilder.size((Long) record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE));
+
+        ffBuilder.addAttributes((Map<String, String>) record.getFieldValue(FlowFileSchema.ATTRIBUTES));
+
+        final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+        final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
+        final FlowFileQueue queue = getFlowFileQueue(queueId);
+
+        return new StandardRepositoryRecord(queue, flowFileRecord);
+    }
+
+    private void populateContentClaim(final StandardFlowFileRecord.Builder ffBuilder, final Record record) {
+        final Object claimMap = record.getFieldValue(FlowFileSchema.CONTENT_CLAIM);
+        if (claimMap == null) {
+            return;
+        }
+
+        final Record claimRecord = (Record) claimMap;
+        final ContentClaim contentClaim = ContentClaimFieldMap.getContentClaim(claimRecord, resourceClaimManager);
+        final Long offset = ContentClaimFieldMap.getContentClaimOffset(claimRecord);
+
+        ffBuilder.contentClaim(contentClaim);
+        ffBuilder.contentClaimOffset(offset);
+    }
+
+    private RepositoryRecord updateRecord(final Record record) {
+        return createRecord(record);
+    }
+
+    private RepositoryRecord deleteRecord(final Record record) {
+        final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD);
+        final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
+        final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+        final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+        repoRecord.markForDelete();
+        return repoRecord;
+    }
+
+    private RepositoryRecord swapInRecord(final Record record) {
+        final StandardRepositoryRecord repoRecord = createRecord(record);
+        final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
+        repoRecord.setSwapLocation(swapLocation);
+        return repoRecord;
+    }
+
+    private RepositoryRecord swapOutRecord(final Record record) {
+        final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD);
+        final String queueId = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
+        final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
+        final FlowFileQueue queue = getFlowFileQueue(queueId);
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .id(recordId)
+            .build();
+
+        return new StandardRepositoryRecord(queue, flowFileRecord, swapLocation);
+    }
+
+    @Override
+    public int getVersion() {
+        return MAX_ENCODING_VERSION;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index a696e79..10f0d8c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -49,8 +49,6 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
-import org.apache.nifi.controller.repository.io.ByteCountingOutputStream;
 import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
 import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
 import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
@@ -75,6 +73,8 @@ import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,8 +121,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
     private int removedCount = 0; // number of flowfiles removed in this session
     private long removedBytes = 0L; // size of all flowfiles removed in this session
-    private final AtomicLong bytesRead = new AtomicLong(0L);
-    private final AtomicLong bytesWritten = new AtomicLong(0L);
+    private long bytesRead = 0L;
+    private long bytesWritten = 0L;
     private int flowFilesIn = 0, flowFilesOut = 0;
     private long contentSizeIn = 0L, contentSizeOut = 0L;
 
@@ -975,8 +975,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         final Connectable connectable = context.getConnectable();
         final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
-        flowFileEvent.setBytesRead(bytesRead.get());
-        flowFileEvent.setBytesWritten(bytesWritten.get());
+        flowFileEvent.setBytesRead(bytesRead);
+        flowFileEvent.setBytesWritten(bytesWritten);
 
         // update event repository
         try {
@@ -1064,8 +1064,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         flowFilesOut = 0;
         removedCount = 0;
         removedBytes = 0L;
-        bytesRead.set(0L);
-        bytesWritten.set(0L);
+        bytesRead = 0L;
+        bytesWritten = 0L;
         connectionCounts.clear();
         createdFlowFiles.clear();
         removedFlowFiles.clear();
@@ -2006,8 +2006,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             // callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1.
             if (allowCachingOfStream && recursionSet.isEmpty()) {
                 if (currentReadClaim == claim) {
-                    if (currentReadClaimStream != null && currentReadClaimStream.getStreamLocation() <= offset) {
-                        final long bytesToSkip = offset - currentReadClaimStream.getStreamLocation();
+                    if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= offset) {
+                        final long bytesToSkip = offset - currentReadClaimStream.getBytesConsumed();
                         if (bytesToSkip > 0) {
                             StreamUtils.skip(currentReadClaimStream, bytesToSkip);
                         }
@@ -2023,7 +2023,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 }
 
                 currentReadClaim = claim;
-                currentReadClaimStream = new ByteCountingInputStream(rawInStream, new AtomicLong(0L));
+                currentReadClaimStream = new ByteCountingInputStream(rawInStream);
                 StreamUtils.skip(currentReadClaimStream, offset);
 
                 // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
@@ -2270,8 +2270,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     writtenCount += footer.length;
                 }
             } finally {
-                bytesWritten.getAndAdd(writtenCount);
-                bytesRead.getAndAdd(readCount);
+                bytesWritten += writtenCount;
+                bytesRead += readCount;
             }
         } catch (final ContentNotFoundException nfe) {
             destroyContent(newClaim);
@@ -2311,8 +2311,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         validateRecordState(source);
         final StandardRepositoryRecord record = records.get(source);
 
+        long writtenToFlowFile = 0L;
         ContentClaim newClaim = null;
-        final AtomicLong writtenHolder = new AtomicLong(0L);
         try {
             newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
             claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
@@ -2320,9 +2320,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             ensureNotAppending(newClaim);
             try (final OutputStream stream = context.getContentRepository().write(newClaim);
                 final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
-                final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
-                recursionSet.add(source);
-                writer.process(new FlowFileAccessOutputStream(countingOut, source));
+                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose)) {
+                try {
+                    recursionSet.add(source);
+                    writer.process(new FlowFileAccessOutputStream(countingOut, source));
+                } finally {
+                    writtenToFlowFile = countingOut.getBytesWritten();
+                    bytesWritten += countingOut.getBytesWritten();
+                }
             } finally {
                 recursionSet.remove(source);
             }
@@ -2342,8 +2347,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             resetWriteClaims(); // need to reset write claim before we can remove the claim
             destroyContent(newClaim);
             throw t;
-        } finally {
-            bytesWritten.getAndAdd(writtenHolder.get());
         }
 
         removeTemporaryClaim(record);
@@ -2351,7 +2354,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             .fromFlowFile(record.getCurrent())
             .contentClaim(newClaim)
             .contentClaimOffset(0)
-            .size(writtenHolder.get())
+            .size(writtenToFlowFile)
             .build();
 
         record.setWorking(newFile);
@@ -2379,7 +2382,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
                     final OutputStream rawOutStream = context.getContentRepository().write(newClaim);
                     final OutputStream bufferedOutStream = new BufferedOutputStream(rawOutStream);
-                    outStream = new ByteCountingOutputStream(bufferedOutStream, new AtomicLong(0L));
+                    outStream = new ByteCountingOutputStream(bufferedOutStream);
                     originalByteWrittenCount = 0;
 
                     appendableStreams.put(newClaim, outStream);
@@ -2448,7 +2451,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         } finally {
             if (outStream != null) {
                 final long bytesWrittenThisIteration = outStream.getBytesWritten() - originalByteWrittenCount;
-                bytesWritten.getAndAdd(bytesWrittenThisIteration);
+                bytesWritten += bytesWrittenThisIteration;
             }
         }
 
@@ -2542,8 +2545,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         final StandardRepositoryRecord record = records.get(source);
         final ContentClaim currClaim = record.getCurrentClaim();
 
+        long writtenToFlowFile = 0L;
         ContentClaim newClaim = null;
-        final AtomicLong writtenHolder = new AtomicLong(0L);
         try {
             newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
             claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
@@ -2556,7 +2559,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
                 final OutputStream os = context.getContentRepository().write(newClaim);
                 final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
-                final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
+                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) {
 
                 recursionSet.add(source);
 
@@ -2574,6 +2577,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     cnfeThrown = true;
                     throw cnfe;
                 } finally {
+                    writtenToFlowFile = countingOut.getBytesWritten();
                     recursionSet.remove(source);
 
                     // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
@@ -2595,7 +2599,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             destroyContent(newClaim);
             throw t;
         } finally {
-            bytesWritten.getAndAdd(writtenHolder.get());
+            bytesWritten += writtenToFlowFile;
         }
 
         removeTemporaryClaim(record);
@@ -2603,7 +2607,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             .fromFlowFile(record.getCurrent())
             .contentClaim(newClaim)
             .contentClaimOffset(0L)
-            .size(writtenHolder.get())
+            .size(writtenToFlowFile)
             .build();
 
         record.setWorking(newFile);
@@ -2635,8 +2639,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         long newSize = 0L;
         try {
             newSize = context.getContentRepository().importFrom(source, newClaim);
-            bytesWritten.getAndAdd(newSize);
-            bytesRead.getAndAdd(newSize);
+            bytesWritten += newSize;
+            bytesRead += newSize;
         } catch (final Throwable t) {
             destroyContent(newClaim);
             throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
@@ -2671,7 +2675,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
 
                 newSize = context.getContentRepository().importFrom(source, newClaim);
-                bytesWritten.getAndAdd(newSize);
+                bytesWritten += newSize;
             } catch (final IOException e) {
                 throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
             }
@@ -2697,8 +2701,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             ensureNotAppending(record.getCurrentClaim());
 
             final long copyCount = context.getContentRepository().exportTo(record.getCurrentClaim(), destination, append, record.getCurrentClaimOffset(), source.getSize());
-            bytesRead.getAndAdd(copyCount);
-            bytesWritten.getAndAdd(copyCount);
+            bytesRead += copyCount;
+            bytesWritten += copyCount;
         } catch (final ContentNotFoundException nfe) {
             handleContentNotFound(nfe, record);
         } catch (final Throwable t) {
@@ -3016,8 +3020,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
             this.removedCount += session.removedCount;
             this.removedBytes += session.removedBytes;
-            this.bytesRead += session.bytesRead.get();
-            this.bytesWritten += session.bytesWritten.get();
+            this.bytesRead += session.bytesRead;
+            this.bytesWritten += session.bytesWritten;
             this.flowFilesIn += session.flowFilesIn;
             this.flowFilesOut += session.flowFilesOut;
             this.contentSizeIn += session.contentSizeIn;

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 9c2a7d8..2a323de 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -16,12 +16,7 @@
  */
 package org.apache.nifi.controller.repository;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -45,16 +40,12 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SerDe;
 import org.wali.SyncListener;
-import org.wali.UpdateType;
 import org.wali.WriteAheadRepository;
 
 /**
@@ -95,7 +86,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
 
     // effectively final
     private WriteAheadRepository<RepositoryRecord> wal;
-    private WriteAheadRecordSerde serde;
+    private RepositoryRecordSerdeFactory serdeFactory;
     private ResourceClaimManager claimManager;
 
     // WALI Provides the ability to register callbacks for when a Partition or the entire Repository is sync'ed with the underlying disk.
@@ -153,8 +144,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         // TODO: Allow for backup path that can be used if disk out of space?? Would allow a snapshot to be stored on
         // backup and then the data deleted from the normal location; then can move backup to normal location and
         // delete backup. On restore, if no files exist in partition's directory, would have to check backup directory
-        serde = new WriteAheadRecordSerde(claimManager);
-        wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, numPartitions, serde, this);
+        serdeFactory = new RepositoryRecordSerdeFactory(claimManager);
+        wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, numPartitions, serdeFactory, this);
     }
 
     @Override
@@ -319,6 +310,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
             repoRecords.add(repoRecord);
         }
 
+        // TODO: We should probably update this to support bulk 'SWAP OUT' records. As-is, we have to write out a
+        // 'SWAP OUT' record for each FlowFile, which includes the Update Type, FlowFile ID, swap file location, and Queue ID.
+        // We could instead have a single record with Update Type of 'SWAP OUT' and just include swap file location, Queue ID,
+        // and all FlowFile ID's.
         // update WALI to indicate that the records were swapped out.
         wal.update(repoRecords, true);
 
@@ -347,9 +342,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         for (final FlowFileQueue queue : queueProvider.getAllQueues()) {
             queueMap.put(queue.getIdentifier(), queue);
         }
-        serde.setQueueMap(queueMap);
+        serdeFactory.setQueueMap(queueMap);
         final Collection<RepositoryRecord> recordList = wal.recoverRecords();
-        serde.setQueueMap(null);
+        serdeFactory.setQueueMap(null);
 
         for (final RepositoryRecord record : recordList) {
             final ContentClaim claim = record.getCurrentClaim();
@@ -361,7 +356,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         // Determine the next sequence number for FlowFiles
         long maxId = minimumSequenceNumber;
         for (final RepositoryRecord record : recordList) {
-            final long recordId = serde.getRecordIdentifier(record);
+            final long recordId = serdeFactory.getRecordIdentifier(record);
             if (recordId > maxId) {
                 maxId = recordId;
             }
@@ -414,526 +409,4 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
     public int checkpoint() throws IOException {
         return wal.checkpoint();
     }
-
-    private static class WriteAheadRecordSerde implements SerDe<RepositoryRecord> {
-
-        private static final int CURRENT_ENCODING_VERSION = 9;
-
-        public static final byte ACTION_CREATE = 0;
-        public static final byte ACTION_UPDATE = 1;
-        public static final byte ACTION_DELETE = 2;
-        public static final byte ACTION_SWAPPED_OUT = 3;
-        public static final byte ACTION_SWAPPED_IN = 4;
-
-        private Map<String, FlowFileQueue> flowFileQueueMap = null;
-        private long recordsRestored = 0L;
-        private final ResourceClaimManager claimManager;
-
-        public WriteAheadRecordSerde(final ResourceClaimManager claimManager) {
-            this.claimManager = claimManager;
-        }
-
-        private void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
-            this.flowFileQueueMap = queueMap;
-        }
-
-        @Override
-        public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out) throws IOException {
-            serializeEdit(previousRecordState, record, out, false);
-        }
-
-        public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) throws IOException {
-            if (record.isMarkedForAbort()) {
-                logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record);
-                out.write(ACTION_DELETE);
-                out.writeLong(getRecordIdentifier(record));
-                serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
-                return;
-            }
-
-            final UpdateType updateType = getUpdateType(record);
-
-            if (updateType.equals(UpdateType.DELETE)) {
-                out.write(ACTION_DELETE);
-                out.writeLong(getRecordIdentifier(record));
-                serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
-                return;
-            }
-
-            // If there's a Destination Connection, that's the one that we want to associated with this record.
-            // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection".
-            // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen,
-            // so we use the originalConnection instead
-            FlowFileQueue associatedQueue = record.getDestination();
-            if (associatedQueue == null) {
-                associatedQueue = record.getOriginalQueue();
-            }
-
-            if (updateType.equals(UpdateType.SWAP_OUT)) {
-                out.write(ACTION_SWAPPED_OUT);
-                out.writeLong(getRecordIdentifier(record));
-                out.writeUTF(associatedQueue.getIdentifier());
-                out.writeUTF(getLocation(record));
-                return;
-            }
-
-            final FlowFile flowFile = record.getCurrent();
-            final ContentClaim claim = record.getCurrentClaim();
-
-            switch (updateType) {
-                case UPDATE:
-                    out.write(ACTION_UPDATE);
-                    break;
-                case CREATE:
-                    out.write(ACTION_CREATE);
-                    break;
-                case SWAP_IN:
-                    out.write(ACTION_SWAPPED_IN);
-                    break;
-                default:
-                    throw new AssertionError();
-            }
-
-            out.writeLong(getRecordIdentifier(record));
-            out.writeLong(flowFile.getEntryDate());
-            out.writeLong(flowFile.getLineageStartDate());
-            out.writeLong(flowFile.getLineageStartIndex());
-
-            final Long queueDate = flowFile.getLastQueueDate();
-            out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate);
-            out.writeLong(flowFile.getQueueDateIndex());
-            out.writeLong(flowFile.getSize());
-
-            if (associatedQueue == null) {
-                logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart",
-                        new Object[]{this, record});
-                writeString("", out);
-            } else {
-                writeString(associatedQueue.getIdentifier(), out);
-            }
-
-            serializeContentClaim(claim, record.getCurrentClaimOffset(), out);
-
-            if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) {
-                out.write(1);   // indicate attributes changed
-                final Map<String, String> attributes = flowFile.getAttributes();
-                out.writeInt(attributes.size());
-                for (final Map.Entry<String, String> entry : attributes.entrySet()) {
-                    writeString(entry.getKey(), out);
-                    writeString(entry.getValue(), out);
-                }
-            } else {
-                out.write(0);   // indicate attributes did not change
-            }
-
-            if (updateType == UpdateType.SWAP_IN) {
-                out.writeUTF(record.getSwapLocation());
-            }
-        }
-
-        @Override
-        public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
-            final int action = in.read();
-            final long recordId = in.readLong();
-            if (action == ACTION_DELETE) {
-                final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
-
-                if (version > 4) {
-                    deserializeClaim(in, version, ffBuilder);
-                }
-
-                final FlowFileRecord flowFileRecord = ffBuilder.build();
-                final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
-                record.markForDelete();
-
-                return record;
-            }
-
-            if (action == ACTION_SWAPPED_OUT) {
-                final String queueId = in.readUTF();
-                final String location = in.readUTF();
-                final FlowFileQueue queue = flowFileQueueMap.get(queueId);
-
-                final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                        .id(recordId)
-                        .build();
-
-                return new StandardRepositoryRecord(queue, flowFileRecord, location);
-            }
-
-            final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
-            final RepositoryRecord record = currentRecordStates.get(recordId);
-            ffBuilder.id(recordId);
-            if (record != null) {
-                ffBuilder.fromFlowFile(record.getCurrent());
-            }
-            ffBuilder.entryDate(in.readLong());
-
-            if (version > 1) {
-                // read the lineage identifiers and lineage start date, which were added in version 2.
-                if (version < 9) {
-                    final int numLineageIds = in.readInt();
-                    for (int i = 0; i < numLineageIds; i++) {
-                        in.readUTF(); //skip identifiers
-                    }
-                }
-                final long lineageStartDate = in.readLong();
-                final long lineageStartIndex;
-                if (version > 7) {
-                    lineageStartIndex = in.readLong();
-                } else {
-                    lineageStartIndex = 0L;
-                }
-                ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
-                if (version > 5) {
-                    final long lastQueueDate = in.readLong();
-                    final long queueDateIndex;
-                    if (version > 7) {
-                        queueDateIndex = in.readLong();
-                    } else {
-                        queueDateIndex = 0L;
-                    }
-
-                    ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
-                }
-            }
-
-            ffBuilder.size(in.readLong());
-            final String connectionId = readString(in);
-
-            logger.debug("{} -> {}", new Object[]{recordId, connectionId});
-
-            deserializeClaim(in, version, ffBuilder);
-
-            // recover new attributes, if they changed
-            final int attributesChanged = in.read();
-            if (attributesChanged == -1) {
-                throw new EOFException();
-            } else if (attributesChanged == 1) {
-                final int numAttributes = in.readInt();
-                final Map<String, String> attributes = new HashMap<>();
-                for (int i = 0; i < numAttributes; i++) {
-                    final String key = readString(in);
-                    final String value = readString(in);
-                    attributes.put(key, value);
-                }
-
-                ffBuilder.addAttributes(attributes);
-            } else if (attributesChanged != 0) {
-                throw new IOException("Attribute Change Qualifier not found in stream; found value: "
-                        + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!");
-            }
-
-            final FlowFileRecord flowFile = ffBuilder.build();
-            String swapLocation = null;
-            if (action == ACTION_SWAPPED_IN) {
-                swapLocation = in.readUTF();
-            }
-
-            final StandardRepositoryRecord standardRepoRecord;
-
-            if (flowFileQueueMap == null) {
-                standardRepoRecord = new StandardRepositoryRecord(null, flowFile);
-            } else {
-                final FlowFileQueue queue = flowFileQueueMap.get(connectionId);
-                standardRepoRecord = new StandardRepositoryRecord(queue, flowFile);
-                if (swapLocation != null) {
-                    standardRepoRecord.setSwapLocation(swapLocation);
-                }
-
-                if (connectionId.isEmpty()) {
-                    logger.warn("{} does not have a Queue associated with it; this record will be discarded", flowFile);
-                    standardRepoRecord.markForAbort();
-                } else if (queue == null) {
-                    logger.warn("{} maps to unknown Queue {}; this record will be discarded", flowFile, connectionId);
-                    standardRepoRecord.markForAbort();
-                }
-            }
-
-            recordsRestored++;
-            return standardRepoRecord;
-        }
-
-        @Override
-        public StandardRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
-            final int action = in.read();
-            if (action == -1) {
-                return null;
-            }
-
-            final long recordId = in.readLong();
-            if (action == ACTION_DELETE) {
-                final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
-
-                if (version > 4) {
-                    deserializeClaim(in, version, ffBuilder);
-                }
-
-                final FlowFileRecord flowFileRecord = ffBuilder.build();
-                final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
-                record.markForDelete();
-                return record;
-            }
-
-            // if action was not delete, it must be create/swap in
-            final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
-            final long entryDate = in.readLong();
-
-            if (version > 1) {
-                // read the lineage identifiers and lineage start date, which were added in version 2.
-                if (version < 9) {
-                    final int numLineageIds = in.readInt();
-                    for (int i = 0; i < numLineageIds; i++) {
-                        in.readUTF(); //skip identifiers
-                    }
-                }
-
-                final long lineageStartDate = in.readLong();
-                final long lineageStartIndex;
-                if (version > 7) {
-                    lineageStartIndex = in.readLong();
-                } else {
-                    lineageStartIndex = 0L;
-                }
-                ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
-                if (version > 5) {
-                    final long lastQueueDate = in.readLong();
-                    final long queueDateIndex;
-                    if (version > 7) {
-                        queueDateIndex = in.readLong();
-                    } else {
-                        queueDateIndex = 0L;
-                    }
-
-                    ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
-                }
-            }
-
-            final long size = in.readLong();
-            final String connectionId = readString(in);
-
-            logger.debug("{} -> {}", new Object[]{recordId, connectionId});
-
-            ffBuilder.id(recordId);
-            ffBuilder.entryDate(entryDate);
-            ffBuilder.size(size);
-
-            deserializeClaim(in, version, ffBuilder);
-
-            final int attributesChanged = in.read();
-            if (attributesChanged == 1) {
-                final int numAttributes = in.readInt();
-                final Map<String, String> attributes = new HashMap<>();
-                for (int i = 0; i < numAttributes; i++) {
-                    final String key = readString(in);
-                    final String value = readString(in);
-                    attributes.put(key, value);
-                }
-
-                ffBuilder.addAttributes(attributes);
-            } else if (attributesChanged == -1) {
-                throw new EOFException();
-            } else if (attributesChanged != 0) {
-                throw new IOException("Attribute Change Qualifier not found in stream; found value: "
-                        + attributesChanged + " after successfully restoring " + recordsRestored + " records");
-            }
-
-            final FlowFileRecord flowFile = ffBuilder.build();
-            String swapLocation = null;
-            if (action == ACTION_SWAPPED_IN) {
-                swapLocation = in.readUTF();
-            }
-
-            final StandardRepositoryRecord record;
-
-            if (flowFileQueueMap == null) {
-                record = new StandardRepositoryRecord(null, flowFile);
-            } else {
-                final FlowFileQueue queue = flowFileQueueMap.get(connectionId);
-                record = new StandardRepositoryRecord(queue, flowFile);
-                if (swapLocation != null) {
-                    record.setSwapLocation(swapLocation);
-                }
-
-                if (connectionId.isEmpty()) {
-                    logger.warn("{} does not have a FlowFile Queue associated with it; this record will be discarded", flowFile);
-                    record.markForAbort();
-                } else if (queue == null) {
-                    logger.warn("{} maps to unknown FlowFile Queue {}; this record will be discarded", flowFile, connectionId);
-                    record.markForAbort();
-                }
-            }
-
-            recordsRestored++;
-            return record;
-        }
-
-        @Override
-        public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException {
-            serializeEdit(null, record, out, true);
-        }
-
-        private void serializeContentClaim(final ContentClaim claim, final long offset, final DataOutputStream out) throws IOException {
-            if (claim == null) {
-                out.write(0);
-            } else {
-                out.write(1);
-
-                final ResourceClaim resourceClaim = claim.getResourceClaim();
-                writeString(resourceClaim.getId(), out);
-                writeString(resourceClaim.getContainer(), out);
-                writeString(resourceClaim.getSection(), out);
-                out.writeLong(claim.getOffset());
-                out.writeLong(claim.getLength());
-
-                out.writeLong(offset);
-                out.writeBoolean(resourceClaim.isLossTolerant());
-            }
-        }
-
-        private void deserializeClaim(final DataInputStream in, final int serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws IOException {
-            // determine current Content Claim.
-            final int claimExists = in.read();
-            if (claimExists == 1) {
-                final String claimId;
-                if (serializationVersion < 4) {
-                    claimId = String.valueOf(in.readLong());
-                } else {
-                    claimId = readString(in);
-                }
-
-                final String container = readString(in);
-                final String section = readString(in);
-
-                final long resourceOffset;
-                final long resourceLength;
-                if (serializationVersion < 7) {
-                    resourceOffset = 0L;
-                    resourceLength = -1L;
-                } else {
-                    resourceOffset = in.readLong();
-                    resourceLength = in.readLong();
-                }
-
-                final long claimOffset = in.readLong();
-
-                final boolean lossTolerant;
-                if (serializationVersion >= 3) {
-                    lossTolerant = in.readBoolean();
-                } else {
-                    lossTolerant = false;
-                }
-
-                final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false);
-                final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
-                contentClaim.setLength(resourceLength);
-
-                ffBuilder.contentClaim(contentClaim);
-                ffBuilder.contentClaimOffset(claimOffset);
-            } else if (claimExists == -1) {
-                throw new EOFException();
-            } else if (claimExists != 0) {
-                throw new IOException("Claim Existence Qualifier not found in stream; found value: "
-                        + claimExists + " after successfully restoring " + recordsRestored + " records");
-            }
-        }
-
-        private void writeString(final String toWrite, final OutputStream out) throws IOException {
-            final byte[] bytes = toWrite.getBytes("UTF-8");
-            final int utflen = bytes.length;
-
-            if (utflen < 65535) {
-                out.write(utflen >>> 8);
-                out.write(utflen);
-                out.write(bytes);
-            } else {
-                out.write(255);
-                out.write(255);
-                out.write(utflen >>> 24);
-                out.write(utflen >>> 16);
-                out.write(utflen >>> 8);
-                out.write(utflen);
-                out.write(bytes);
-            }
-        }
-
-        private String readString(final InputStream in) throws IOException {
-            final Integer numBytes = readFieldLength(in);
-            if (numBytes == null) {
-                throw new EOFException();
-            }
-            final byte[] bytes = new byte[numBytes];
-            fillBuffer(in, bytes, numBytes);
-            return new String(bytes, "UTF-8");
-        }
-
-        private Integer readFieldLength(final InputStream in) throws IOException {
-            final int firstValue = in.read();
-            final int secondValue = in.read();
-            if (firstValue < 0) {
-                return null;
-            }
-            if (secondValue < 0) {
-                throw new EOFException();
-            }
-            if (firstValue == 0xff && secondValue == 0xff) {
-                final int ch1 = in.read();
-                final int ch2 = in.read();
-                final int ch3 = in.read();
-                final int ch4 = in.read();
-                if ((ch1 | ch2 | ch3 | ch4) < 0) {
-                    throw new EOFException();
-                }
-                return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
-            } else {
-                return (firstValue << 8) + secondValue;
-            }
-        }
-
-        private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
-            int bytesRead;
-            int totalBytesRead = 0;
-            while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
-                totalBytesRead += bytesRead;
-            }
-            if (totalBytesRead != length) {
-                throw new EOFException();
-            }
-        }
-
-        @Override
-        public Long getRecordIdentifier(final RepositoryRecord record) {
-            return record.getCurrent().getId();
-        }
-
-        @Override
-        public UpdateType getUpdateType(final RepositoryRecord record) {
-            switch (record.getType()) {
-                case CONTENTMISSING:
-                case DELETE:
-                    return UpdateType.DELETE;
-                case CREATE:
-                    return UpdateType.CREATE;
-                case UPDATE:
-                    return UpdateType.UPDATE;
-                case SWAP_OUT:
-                    return UpdateType.SWAP_OUT;
-                case SWAP_IN:
-                    return UpdateType.SWAP_IN;
-            }
-            return null;
-        }
-
-        @Override
-        public int getVersion() {
-            return CURRENT_ENCODING_VERSION;
-        }
-
-        @Override
-        public String getLocation(final RepositoryRecord record) {
-            return record.getSwapLocation();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
new file mode 100644
index 0000000..e8ce44e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.flowfile.FlowFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
+    private static final Logger logger = LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class);
+
+    private static final int CURRENT_ENCODING_VERSION = 9;
+
+    public static final byte ACTION_CREATE = 0;
+    public static final byte ACTION_UPDATE = 1;
+    public static final byte ACTION_DELETE = 2;
+    public static final byte ACTION_SWAPPED_OUT = 3;
+    public static final byte ACTION_SWAPPED_IN = 4;
+
+    private long recordsRestored = 0L;
+    private final ResourceClaimManager claimManager;
+
+    public WriteAheadRepositoryRecordSerde(final ResourceClaimManager claimManager) {
+        this.claimManager = claimManager;
+    }
+
+    @Override
+    public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out) throws IOException {
+        serializeEdit(previousRecordState, record, out, false);
+    }
+
+    public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) throws IOException {
+        if (record.isMarkedForAbort()) {
+            logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record);
+            out.write(ACTION_DELETE);
+            out.writeLong(getRecordIdentifier(record));
+            serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
+            return;
+        }
+
+        final UpdateType updateType = getUpdateType(record);
+
+        if (updateType.equals(UpdateType.DELETE)) {
+            out.write(ACTION_DELETE);
+            out.writeLong(getRecordIdentifier(record));
+            serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
+            return;
+        }
+
+        // If there's a Destination Connection, that's the one that we want to associated with this record.
+        // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection".
+        // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen,
+        // so we use the originalConnection instead
+        FlowFileQueue associatedQueue = record.getDestination();
+        if (associatedQueue == null) {
+            associatedQueue = record.getOriginalQueue();
+        }
+
+        if (updateType.equals(UpdateType.SWAP_OUT)) {
+            out.write(ACTION_SWAPPED_OUT);
+            out.writeLong(getRecordIdentifier(record));
+            out.writeUTF(associatedQueue.getIdentifier());
+            out.writeUTF(getLocation(record));
+            return;
+        }
+
+        final FlowFile flowFile = record.getCurrent();
+        final ContentClaim claim = record.getCurrentClaim();
+
+        switch (updateType) {
+            case UPDATE:
+                out.write(ACTION_UPDATE);
+                break;
+            case CREATE:
+                out.write(ACTION_CREATE);
+                break;
+            case SWAP_IN:
+                out.write(ACTION_SWAPPED_IN);
+                break;
+            default:
+                throw new AssertionError();
+        }
+
+        out.writeLong(getRecordIdentifier(record));
+        out.writeLong(flowFile.getEntryDate());
+        out.writeLong(flowFile.getLineageStartDate());
+        out.writeLong(flowFile.getLineageStartIndex());
+
+        final Long queueDate = flowFile.getLastQueueDate();
+        out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate);
+        out.writeLong(flowFile.getQueueDateIndex());
+        out.writeLong(flowFile.getSize());
+
+        if (associatedQueue == null) {
+            logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart",
+                new Object[] {this, record});
+            writeString("", out);
+        } else {
+            writeString(associatedQueue.getIdentifier(), out);
+        }
+
+        serializeContentClaim(claim, record.getCurrentClaimOffset(), out);
+
+        if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) {
+            out.write(1);   // indicate attributes changed
+            final Map<String, String> attributes = flowFile.getAttributes();
+            out.writeInt(attributes.size());
+            for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+                writeString(entry.getKey(), out);
+                writeString(entry.getValue(), out);
+            }
+        } else {
+            out.write(0);   // indicate attributes did not change
+        }
+
+        if (updateType == UpdateType.SWAP_IN) {
+            out.writeUTF(record.getSwapLocation());
+        }
+    }
+
+    @Override
+    public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
+        final int action = in.read();
+        final long recordId = in.readLong();
+        if (action == ACTION_DELETE) {
+            final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
+
+            if (version > 4) {
+                deserializeClaim(in, version, ffBuilder);
+            }
+
+            final FlowFileRecord flowFileRecord = ffBuilder.build();
+            final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+            record.markForDelete();
+
+            return record;
+        }
+
+        if (action == ACTION_SWAPPED_OUT) {
+            final String queueId = in.readUTF();
+            final String location = in.readUTF();
+            final FlowFileQueue queue = getFlowFileQueue(queueId);
+
+            final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+                .id(recordId)
+                .build();
+
+            return new StandardRepositoryRecord(queue, flowFileRecord, location);
+        }
+
+        final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+        final RepositoryRecord record = currentRecordStates.get(recordId);
+        ffBuilder.id(recordId);
+        if (record != null) {
+            ffBuilder.fromFlowFile(record.getCurrent());
+        }
+        ffBuilder.entryDate(in.readLong());
+
+        if (version > 1) {
+            // read the lineage identifiers and lineage start date, which were added in version 2.
+            if (version < 9) {
+                final int numLineageIds = in.readInt();
+                for (int i = 0; i < numLineageIds; i++) {
+                    in.readUTF(); //skip identifiers
+                }
+            }
+            final long lineageStartDate = in.readLong();
+            final long lineageStartIndex;
+            if (version > 7) {
+                lineageStartIndex = in.readLong();
+            } else {
+                lineageStartIndex = 0L;
+            }
+            ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
+
+            if (version > 5) {
+                final long lastQueueDate = in.readLong();
+                final long queueDateIndex;
+                if (version > 7) {
+                    queueDateIndex = in.readLong();
+                } else {
+                    queueDateIndex = 0L;
+                }
+
+                ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
+            }
+        }
+
+        ffBuilder.size(in.readLong());
+        final String connectionId = readString(in);
+
+        logger.debug("{} -> {}", new Object[] {recordId, connectionId});
+
+        deserializeClaim(in, version, ffBuilder);
+
+        // recover new attributes, if they changed
+        final int attributesChanged = in.read();
+        if (attributesChanged == -1) {
+            throw new EOFException();
+        } else if (attributesChanged == 1) {
+            final int numAttributes = in.readInt();
+            final Map<String, String> attributes = new HashMap<>();
+            for (int i = 0; i < numAttributes; i++) {
+                final String key = readString(in);
+                final String value = readString(in);
+                attributes.put(key, value);
+            }
+
+            ffBuilder.addAttributes(attributes);
+        } else if (attributesChanged != 0) {
+            throw new IOException("Attribute Change Qualifier not found in stream; found value: "
+                + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!");
+        }
+
+        final FlowFileRecord flowFile = ffBuilder.build();
+        String swapLocation = null;
+        if (action == ACTION_SWAPPED_IN) {
+            swapLocation = in.readUTF();
+        }
+
+        final FlowFileQueue queue = getFlowFileQueue(connectionId);
+        final StandardRepositoryRecord standardRepoRecord = new StandardRepositoryRecord(queue, flowFile);
+        if (swapLocation != null) {
+            standardRepoRecord.setSwapLocation(swapLocation);
+        }
+
+        if (connectionId.isEmpty()) {
+            logger.warn("{} does not have a Queue associated with it; this record will be discarded", flowFile);
+            standardRepoRecord.markForAbort();
+        } else if (queue == null) {
+            logger.warn("{} maps to unknown Queue {}; this record will be discarded", flowFile, connectionId);
+            standardRepoRecord.markForAbort();
+        }
+
+        recordsRestored++;
+        return standardRepoRecord;
+    }
+
+    @Override
+    public StandardRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
+        final int action = in.read();
+        if (action == -1) {
+            return null;
+        }
+
+        final long recordId = in.readLong();
+        if (action == ACTION_DELETE) {
+            final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
+
+            if (version > 4) {
+                deserializeClaim(in, version, ffBuilder);
+            }
+
+            final FlowFileRecord flowFileRecord = ffBuilder.build();
+            final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+            record.markForDelete();
+            return record;
+        }
+
+        // if action was not delete, it must be create/swap in
+        final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+        final long entryDate = in.readLong();
+
+        if (version > 1) {
+            // read the lineage identifiers and lineage start date, which were added in version 2.
+            if (version < 9) {
+                final int numLineageIds = in.readInt();
+                for (int i = 0; i < numLineageIds; i++) {
+                    in.readUTF(); //skip identifiers
+                }
+            }
+
+            final long lineageStartDate = in.readLong();
+            final long lineageStartIndex;
+            if (version > 7) {
+                lineageStartIndex = in.readLong();
+            } else {
+                lineageStartIndex = 0L;
+            }
+            ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
+
+            if (version > 5) {
+                final long lastQueueDate = in.readLong();
+                final long queueDateIndex;
+                if (version > 7) {
+                    queueDateIndex = in.readLong();
+                } else {
+                    queueDateIndex = 0L;
+                }
+
+                ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
+            }
+        }
+
+        final long size = in.readLong();
+        final String connectionId = readString(in);
+
+        logger.debug("{} -> {}", new Object[] {recordId, connectionId});
+
+        ffBuilder.id(recordId);
+        ffBuilder.entryDate(entryDate);
+        ffBuilder.size(size);
+
+        deserializeClaim(in, version, ffBuilder);
+
+        final int attributesChanged = in.read();
+        if (attributesChanged == 1) {
+            final int numAttributes = in.readInt();
+            final Map<String, String> attributes = new HashMap<>();
+            for (int i = 0; i < numAttributes; i++) {
+                final String key = readString(in);
+                final String value = readString(in);
+                attributes.put(key, value);
+            }
+
+            ffBuilder.addAttributes(attributes);
+        } else if (attributesChanged == -1) {
+            throw new EOFException();
+        } else if (attributesChanged != 0) {
+            throw new IOException("Attribute Change Qualifier not found in stream; found value: "
+                + attributesChanged + " after successfully restoring " + recordsRestored + " records");
+        }
+
+        final FlowFileRecord flowFile = ffBuilder.build();
+        String swapLocation = null;
+        if (action == ACTION_SWAPPED_IN) {
+            swapLocation = in.readUTF();
+        }
+
+        final StandardRepositoryRecord record;
+        final FlowFileQueue queue = getFlowFileQueue(connectionId);
+        record = new StandardRepositoryRecord(queue, flowFile);
+        if (swapLocation != null) {
+            record.setSwapLocation(swapLocation);
+        }
+
+        if (connectionId.isEmpty()) {
+            logger.warn("{} does not have a FlowFile Queue associated with it; this record will be discarded", flowFile);
+            record.markForAbort();
+        } else if (queue == null) {
+            logger.warn("{} maps to unknown FlowFile Queue {}; this record will be discarded", flowFile, connectionId);
+            record.markForAbort();
+        }
+
+        recordsRestored++;
+        return record;
+    }
+
+    @Override
+    public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException {
+        serializeEdit(null, record, out, true);
+    }
+
+    private void serializeContentClaim(final ContentClaim claim, final long offset, final DataOutputStream out) throws IOException {
+        if (claim == null) {
+            out.write(0);
+        } else {
+            out.write(1);
+
+            final ResourceClaim resourceClaim = claim.getResourceClaim();
+            writeString(resourceClaim.getId(), out);
+            writeString(resourceClaim.getContainer(), out);
+            writeString(resourceClaim.getSection(), out);
+            out.writeLong(claim.getOffset());
+            out.writeLong(claim.getLength());
+
+            out.writeLong(offset);
+            out.writeBoolean(resourceClaim.isLossTolerant());
+        }
+    }
+
+    private void deserializeClaim(final DataInputStream in, final int serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws IOException {
+        // determine current Content Claim.
+        final int claimExists = in.read();
+        if (claimExists == 1) {
+            final String claimId;
+            if (serializationVersion < 4) {
+                claimId = String.valueOf(in.readLong());
+            } else {
+                claimId = readString(in);
+            }
+
+            final String container = readString(in);
+            final String section = readString(in);
+
+            final long resourceOffset;
+            final long resourceLength;
+            if (serializationVersion < 7) {
+                resourceOffset = 0L;
+                resourceLength = -1L;
+            } else {
+                resourceOffset = in.readLong();
+                resourceLength = in.readLong();
+            }
+
+            final long claimOffset = in.readLong();
+
+            final boolean lossTolerant;
+            if (serializationVersion >= 3) {
+                lossTolerant = in.readBoolean();
+            } else {
+                lossTolerant = false;
+            }
+
+            final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false);
+            final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
+            contentClaim.setLength(resourceLength);
+
+            ffBuilder.contentClaim(contentClaim);
+            ffBuilder.contentClaimOffset(claimOffset);
+        } else if (claimExists == -1) {
+            throw new EOFException();
+        } else if (claimExists != 0) {
+            throw new IOException("Claim Existence Qualifier not found in stream; found value: "
+                + claimExists + " after successfully restoring " + recordsRestored + " records");
+        }
+    }
+
+    private void writeString(final String toWrite, final OutputStream out) throws IOException {
+        final byte[] bytes = toWrite.getBytes("UTF-8");
+        final int utflen = bytes.length;
+
+        if (utflen < 65535) {
+            out.write(utflen >>> 8);
+            out.write(utflen);
+            out.write(bytes);
+        } else {
+            out.write(255);
+            out.write(255);
+            out.write(utflen >>> 24);
+            out.write(utflen >>> 16);
+            out.write(utflen >>> 8);
+            out.write(utflen);
+            out.write(bytes);
+        }
+    }
+
+    private String readString(final InputStream in) throws IOException {
+        final Integer numBytes = readFieldLength(in);
+        if (numBytes == null) {
+            throw new EOFException();
+        }
+        final byte[] bytes = new byte[numBytes];
+        fillBuffer(in, bytes, numBytes);
+        return new String(bytes, "UTF-8");
+    }
+
+    private Integer readFieldLength(final InputStream in) throws IOException {
+        final int firstValue = in.read();
+        final int secondValue = in.read();
+        if (firstValue < 0) {
+            return null;
+        }
+        if (secondValue < 0) {
+            throw new EOFException();
+        }
+        if (firstValue == 0xff && secondValue == 0xff) {
+            final int ch1 = in.read();
+            final int ch2 = in.read();
+            final int ch3 = in.read();
+            final int ch4 = in.read();
+            if ((ch1 | ch2 | ch3 | ch4) < 0) {
+                throw new EOFException();
+            }
+            return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
+        } else {
+            return (firstValue << 8) + secondValue;
+        }
+    }
+
+    private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
+        int bytesRead;
+        int totalBytesRead = 0;
+        while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
+            totalBytesRead += bytesRead;
+        }
+        if (totalBytesRead != length) {
+            throw new EOFException();
+        }
+    }
+
+    @Override
+    public int getVersion() {
+        return CURRENT_ENCODING_VERSION;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
index 25dbaee..7e87199 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
@@ -64,29 +64,6 @@ public class StandardResourceClaim implements ResourceClaim, Comparable<Resource
         return section;
     }
 
-    /**
-     * Provides the natural ordering for ResourceClaim objects. By default they are sorted by their id, then container, then section
-     *
-     * @param other other claim
-     * @return x such that x <=1 if this is less than other;
-     *         x=0 if this.equals(other);
-     *         x >= 1 if this is greater than other
-     */
-    @Override
-    public int compareTo(final ResourceClaim other) {
-        final int idComparison = id.compareTo(other.getId());
-        if (idComparison != 0) {
-            return idComparison;
-        }
-
-        final int containerComparison = container.compareTo(other.getContainer());
-        if (containerComparison != 0) {
-            return containerComparison;
-        }
-
-        return section.compareTo(other.getSection());
-    }
-
     @Override
     public boolean equals(final Object other) {
         if (this == other) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
index 7d554b1..e4f060e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -29,10 +29,9 @@ import org.slf4j.LoggerFactory;
 
 public class StandardResourceClaimManager implements ResourceClaimManager {
 
-    private static final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>();
     private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class);
-
-    private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
+    private final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>();
+    private final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
 
     @Override
     public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant, final boolean writable) {
@@ -50,7 +49,7 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
         return (count == null) ? null : count.getClaim();
     }
 
-    private static AtomicInteger getCounter(final ResourceClaim claim) {
+    private AtomicInteger getCounter(final ResourceClaim claim) {
         if (claim == null) {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java
deleted file mode 100644
index 7de25ac..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class ByteCountingInputStream extends InputStream {
-
-    private final AtomicLong bytesReadHolder;
-    private final InputStream in;
-    private long bytesSkipped = 0L;
-
-    public ByteCountingInputStream(final InputStream in, final AtomicLong longHolder) {
-        this.in = in;
-        this.bytesReadHolder = longHolder;
-    }
-
-    @Override
-    public int read() throws IOException {
-        final int fromSuper = in.read();
-        if (fromSuper >= 0) {
-            bytesReadHolder.getAndIncrement();
-        }
-        return fromSuper;
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        final int fromSuper = in.read(b, off, len);
-        if (fromSuper >= 0) {
-            bytesReadHolder.getAndAdd(fromSuper);
-        }
-
-        return fromSuper;
-    }
-
-    @Override
-    public int read(byte[] b) throws IOException {
-        return read(b, 0, b.length);
-    }
-
-    @Override
-    public long skip(final long n) throws IOException {
-        final long skipped = in.skip(n);
-        bytesSkipped += skipped;
-        return skipped;
-    }
-
-    @Override
-    public int available() throws IOException {
-        return in.available();
-    }
-
-    @Override
-    public void mark(int readlimit) {
-        in.mark(readlimit);
-    }
-
-    @Override
-    public boolean markSupported() {
-        return in.markSupported();
-    }
-
-    @Override
-    public void reset() throws IOException {
-        in.reset();
-    }
-
-    @Override
-    public void close() throws IOException {
-        in.close();
-    }
-
-    public long getBytesRead() {
-        return bytesReadHolder.get();
-    }
-
-    public long getBytesSkipped() {
-        return bytesSkipped;
-    }
-
-    public long getStreamLocation() {
-        return getBytesRead() + getBytesSkipped();
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
deleted file mode 100644
index 7c778a2..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class ByteCountingOutputStream extends OutputStream {
-
-    private final AtomicLong bytesWrittenHolder;
-    private final OutputStream out;
-
-    public ByteCountingOutputStream(final OutputStream out, final AtomicLong longHolder) {
-        this.out = out;
-        this.bytesWrittenHolder = longHolder;
-    }
-
-    @Override
-    public void write(int b) throws IOException {
-        out.write(b);
-        bytesWrittenHolder.getAndIncrement();
-    }
-
-    @Override
-    public void write(byte[] b) throws IOException {
-        write(b, 0, b.length);
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-        out.write(b, off, len);
-        bytesWrittenHolder.getAndAdd(len);
-    }
-
-    public long getBytesWritten() {
-        return bytesWrittenHolder.get();
-    }
-
-    @Override
-    public void flush() throws IOException {
-        out.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-        out.close();
-    }
-}