You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/18 19:54:06 UTC

[6/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
index 2afaa70..324f59f 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
@@ -16,19 +16,445 @@
  */
 package org.apache.nifi.stream.io;
 
+import java.io.IOException;
 import java.io.InputStream;
 
 /**
  * This class is a slight modification of the BufferedInputStream in the java.io package. The modification is that this implementation does not provide synchronization on method calls, which means
  * that this class is not suitable for use by multiple threads. However, the absence of these synchronized blocks results in potentially much better performance.
  */
-public class BufferedInputStream extends java.io.BufferedInputStream {
+public class BufferedInputStream extends InputStream {
 
-    public BufferedInputStream(final InputStream in) {
-        super(in);
+    private final InputStream in;
+
+    private static int DEFAULT_BUFFER_SIZE = 8192;
+
+    /**
+     * The maximum size of array to allocate.
+     * Some VMs reserve some header words in an array.
+     * Attempts to allocate larger arrays may result in
+     * OutOfMemoryError: Requested array size exceeds VM limit
+     */
+    private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
+
+    /**
+     * The internal buffer array where the data is stored. When necessary,
+     * it may be replaced by another array of
+     * a different size.
+     */
+    protected byte buf[];
+
+    /**
+     * The index one greater than the index of the last valid byte in
+     * the buffer.
+     * This value is always
+     * in the range <code>0</code> through <code>buf.length</code>;
+     * elements <code>buf[0]</code> through <code>buf[count-1]
+     * </code>contain buffered input data obtained
+     * from the underlying input stream.
+     */
+    private int count;
+
+    /**
+     * The current position in the buffer. This is the index of the next
+     * character to be read from the <code>buf</code> array.
+     * <p>
+     * This value is always in the range <code>0</code>
+     * through <code>count</code>. If it is less
+     * than <code>count</code>, then <code>buf[pos]</code>
+     * is the next byte to be supplied as input;
+     * if it is equal to <code>count</code>, then
+     * the next <code>read</code> or <code>skip</code>
+     * operation will require more bytes to be
+     * read from the contained input stream.
+     *
+     * @see java.io.BufferedInputStream#buf
+     */
+    private int pos;
+
+    /**
+     * The value of the <code>pos</code> field at the time the last
+     * <code>mark</code> method was called.
+     * <p>
+     * This value is always
+     * in the range <code>-1</code> through <code>pos</code>.
+     * If there is no marked position in the input
+     * stream, this field is <code>-1</code>. If
+     * there is a marked position in the input
+     * stream, then <code>buf[markpos]</code>
+     * is the first byte to be supplied as input
+     * after a <code>reset</code> operation. If
+     * <code>markpos</code> is not <code>-1</code>,
+     * then all bytes from positions <code>buf[markpos]</code>
+     * through <code>buf[pos-1]</code> must remain
+     * in the buffer array (though they may be
+     * moved to another place in the buffer array,
+     * with suitable adjustments to the values
+     * of <code>count</code>, <code>pos</code>,
+     * and <code>markpos</code>); they may not
+     * be discarded unless and until the difference
+     * between <code>pos</code> and <code>markpos</code>
+     * exceeds <code>marklimit</code>.
+     *
+     * @see java.io.BufferedInputStream#mark(int)
+     * @see java.io.BufferedInputStream#pos
+     */
+    protected int markpos = -1;
+
+    /**
+     * The maximum read ahead allowed after a call to the
+     * <code>mark</code> method before subsequent calls to the
+     * <code>reset</code> method fail.
+     * Whenever the difference between <code>pos</code>
+     * and <code>markpos</code> exceeds <code>marklimit</code>,
+     * then the mark may be dropped by setting
+     * <code>markpos</code> to <code>-1</code>.
+     *
+     * @see java.io.BufferedInputStream#mark(int)
+     * @see java.io.BufferedInputStream#reset()
+     */
+    protected int marklimit;
+
+    /**
+     * Check to make sure that underlying input stream has not been
+     * nulled out due to close; if not return it;
+     */
+    private InputStream getInIfOpen() throws IOException {
+        InputStream input = in;
+        if (input == null) {
+            throw new IOException("Stream closed");
+        }
+        return input;
+    }
+
+    /**
+     * Check to make sure that buffer has not been nulled out due to
+     * close; if not return it;
+     */
+    private byte[] getBufIfOpen() throws IOException {
+        if (buf == null) {
+            throw new IOException("Stream closed");
+        }
+        return buf;
+    }
+
+    /**
+     * Creates a <code>BufferedInputStream</code>
+     * and saves its argument, the input stream
+     * <code>in</code>, for later use. An internal
+     * buffer array is created and stored in <code>buf</code>.
+     *
+     * @param in the underlying input stream.
+     */
+    public BufferedInputStream(InputStream in) {
+        this(in, DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Creates a <code>BufferedInputStream</code>
+     * with the specified buffer size,
+     * and saves its argument, the input stream
+     * <code>in</code>, for later use. An internal
+     * buffer array of length <code>size</code>
+     * is created and stored in <code>buf</code>.
+     *
+     * @param in the underlying input stream.
+     * @param size the buffer size.
+     * @exception IllegalArgumentException if {@code size <= 0}.
+     */
+    public BufferedInputStream(InputStream in, int size) {
+        this.in = in;
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buf = new byte[size];
+    }
+
+    /**
+     * Fills the buffer with more data, taking into account
+     * shuffling and other tricks for dealing with marks.
+     * Assumes that it is being called by a synchronized method.
+     * This method also assumes that all data has already been read in,
+     * hence pos > count.
+     */
+    private void fill() throws IOException {
+        byte[] buffer = getBufIfOpen();
+        if (markpos < 0) {
+            pos = 0; /* no mark: throw away the buffer */
+        } else if (pos >= buffer.length) {
+            if (markpos > 0) {  /* can throw away early part of the buffer */
+                int sz = pos - markpos;
+                System.arraycopy(buffer, markpos, buffer, 0, sz);
+                pos = sz;
+                markpos = 0;
+            } else if (buffer.length >= marklimit) {
+                markpos = -1;   /* buffer got too big, invalidate mark */
+                pos = 0;        /* drop buffer contents */
+            } else if (buffer.length >= MAX_BUFFER_SIZE) {
+                throw new OutOfMemoryError("Required array size too large");
+            } else {            /* grow buffer */
+                int nsz = (pos <= MAX_BUFFER_SIZE - pos) ? pos * 2 : MAX_BUFFER_SIZE;
+                if (nsz > marklimit) {
+                    nsz = marklimit;
+                }
+                byte nbuf[] = new byte[nsz];
+                System.arraycopy(buffer, 0, nbuf, 0, pos);
+                buffer = nbuf;
+            }
+        }
+        count = pos;
+        int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
+        if (n > 0) {
+            count = n + pos;
+        }
+    }
+
+    /**
+     * See
+     * the general contract of the <code>read</code>
+     * method of <code>InputStream</code>.
+     *
+     * @return the next byte of data, or <code>-1</code> if the end of the
+     *         stream is reached.
+     * @exception IOException if this input stream has been closed by
+     *                invoking its {@link #close()} method,
+     *                or an I/O error occurs.
+     * @see java.io.FilterInputStream#in
+     */
+    @Override
+    public int read() throws IOException {
+        if (pos >= count) {
+            fill();
+            if (pos >= count) {
+                return -1;
+            }
+        }
+        return getBufIfOpen()[pos++] & 0xff;
+    }
+
+    /**
+     * Read characters into a portion of an array, reading from the underlying
+     * stream at most once if necessary.
+     */
+    private int read1(byte[] b, int off, int len) throws IOException {
+        int avail = count - pos;
+        if (avail <= 0) {
+            /*
+             * If the requested length is at least as large as the buffer, and
+             * if there is no mark/reset activity, do not bother to copy the
+             * bytes into the local buffer. In this way buffered streams will
+             * cascade harmlessly.
+             */
+            if (len >= getBufIfOpen().length && markpos < 0) {
+                return getInIfOpen().read(b, off, len);
+            }
+            fill();
+            avail = count - pos;
+            if (avail <= 0) {
+                return -1;
+            }
+        }
+        int cnt = (avail < len) ? avail : len;
+        System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
+        pos += cnt;
+        return cnt;
+    }
+
+    /**
+     * Reads bytes from this byte-input stream into the specified byte array,
+     * starting at the given offset.
+     *
+     * <p>
+     * This method implements the general contract of the corresponding
+     * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
+     * the <code>{@link InputStream}</code> class. As an additional
+     * convenience, it attempts to read as many bytes as possible by repeatedly
+     * invoking the <code>read</code> method of the underlying stream. This
+     * iterated <code>read</code> continues until one of the following
+     * conditions becomes true:
+     * <ul>
+     *
+     * <li>The specified number of bytes have been read,
+     *
+     * <li>The <code>read</code> method of the underlying stream returns
+     * <code>-1</code>, indicating end-of-file, or
+     *
+     * <li>The <code>available</code> method of the underlying stream
+     * returns zero, indicating that further input requests would block.
+     *
+     * </ul>
+     * If the first <code>read</code> on the underlying stream returns
+     * <code>-1</code> to indicate end-of-file then this method returns
+     * <code>-1</code>. Otherwise this method returns the number of bytes
+     * actually read.
+     *
+     * <p>
+     * Subclasses of this class are encouraged, but not required, to
+     * attempt to read as many bytes as possible in the same fashion.
+     *
+     * @param b destination buffer.
+     * @param off offset at which to start storing bytes.
+     * @param len maximum number of bytes to read.
+     * @return the number of bytes read, or <code>-1</code> if the end of
+     *         the stream has been reached.
+     * @exception IOException if this input stream has been closed by
+     *                invoking its {@link #close()} method,
+     *                or an I/O error occurs.
+     */
+    @Override
+    public int read(byte b[], int off, int len)
+        throws IOException {
+        getBufIfOpen(); // Check for closed stream
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+
+        int n = 0;
+        for (;;) {
+            int nread = read1(b, off + n, len - n);
+            if (nread <= 0) {
+                return (n == 0) ? nread : n;
+            }
+            n += nread;
+            if (n >= len) {
+                return n;
+            }
+            // if not closed but no bytes available, return
+            InputStream input = in;
+            if (input != null && input.available() <= 0) {
+                return n;
+            }
+        }
+    }
+
+    /**
+     * See the general contract of the <code>skip</code>
+     * method of <code>InputStream</code>.
+     *
+     * @exception IOException if the stream does not support seek,
+     *                or if this input stream has been closed by
+     *                invoking its {@link #close()} method, or an
+     *                I/O error occurs.
+     */
+    @Override
+    public long skip(long n) throws IOException {
+        getBufIfOpen(); // Check for closed stream
+        if (n <= 0) {
+            return 0;
+        }
+        long avail = count - pos;
+
+        if (avail <= 0) {
+            // If no mark position set then don't keep in buffer
+            if (markpos < 0) {
+                return getInIfOpen().skip(n);
+            }
+
+            // Fill in buffer to save bytes for reset
+            fill();
+            avail = count - pos;
+            if (avail <= 0) {
+                return 0;
+            }
+        }
+
+        long skipped = (avail < n) ? avail : n;
+        pos += skipped;
+        return skipped;
+    }
+
+    /**
+     * Returns an estimate of the number of bytes that can be read (or
+     * skipped over) from this input stream without blocking by the next
+     * invocation of a method for this input stream. The next invocation might be
+     * the same thread or another thread. A single read or skip of this
+     * many bytes will not block, but may read or skip fewer bytes.
+     * <p>
+     * This method returns the sum of the number of bytes remaining to be read in
+     * the buffer (<code>count&nbsp;- pos</code>) and the result of calling the
+     * {@link java.io.FilterInputStream#in in}.available().
+     *
+     * @return an estimate of the number of bytes that can be read (or skipped
+     *         over) from this input stream without blocking.
+     * @exception IOException if this input stream has been closed by
+     *                invoking its {@link #close()} method,
+     *                or an I/O error occurs.
+     */
+    @Override
+    public int available() throws IOException {
+        int n = count - pos;
+        int avail = getInIfOpen().available();
+        return n > (Integer.MAX_VALUE - avail) ? Integer.MAX_VALUE : n + avail;
+    }
+
+    /**
+     * See the general contract of the <code>mark</code>
+     * method of <code>InputStream</code>.
+     *
+     * @param readlimit the maximum limit of bytes that can be read before
+     *            the mark position becomes invalid.
+     * @see java.io.BufferedInputStream#reset()
+     */
+    @Override
+    public void mark(int readlimit) {
+        marklimit = readlimit;
+        markpos = pos;
+    }
+
+    /**
+     * See the general contract of the <code>reset</code>
+     * method of <code>InputStream</code>.
+     * <p>
+     * If <code>markpos</code> is <code>-1</code>
+     * (no mark has been set or the mark has been
+     * invalidated), an <code>IOException</code>
+     * is thrown. Otherwise, <code>pos</code> is
+     * set equal to <code>markpos</code>.
+     *
+     * @exception IOException if this stream has not been marked or,
+     *                if the mark has been invalidated, or the stream
+     *                has been closed by invoking its {@link #close()}
+     *                method, or an I/O error occurs.
+     * @see java.io.BufferedInputStream#mark(int)
+     */
+    @Override
+    public void reset() throws IOException {
+        getBufIfOpen(); // Cause exception if closed
+        if (markpos < 0) {
+            throw new IOException("Resetting to invalid mark");
+        }
+        pos = markpos;
+    }
+
+    /**
+     * Tests if this input stream supports the <code>mark</code>
+     * and <code>reset</code> methods. The <code>markSupported</code>
+     * method of <code>BufferedInputStream</code> returns
+     * <code>true</code>.
+     *
+     * @return a <code>boolean</code> indicating if this stream type supports
+     *         the <code>mark</code> and <code>reset</code> methods.
+     * @see java.io.InputStream#mark(int)
+     * @see java.io.InputStream#reset()
+     */
+    @Override
+    public boolean markSupported() {
+        return true;
     }
 
-    public BufferedInputStream(final InputStream in, final int size) {
-        super(in, size);
+    /**
+     * Closes this input stream and releases any system resources
+     * associated with the stream.
+     * Once the stream has been closed, further read(), available(), reset(),
+     * or skip() invocations will throw an IOException.
+     * Closing a previously closed stream has no effect.
+     *
+     * @exception IOException if an I/O error occurs.
+     */
+    @Override
+    public void close() throws IOException {
+        this.in.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
index a30c14e..cb9ab36 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java
@@ -210,4 +210,42 @@ public class FormatUtils {
         return sb.toString();
     }
 
+    /**
+     * Formats nanoseconds in the format:
+     * 3 seconds, 8 millis, 3 nanos - if includeTotalNanos = false,
+     * 3 seconds, 8 millis, 3 nanos (3008000003 nanos) - if includeTotalNanos = true
+     *
+     * @param nanos the number of nanoseconds to format
+     * @param includeTotalNanos whether or not to include the total number of nanoseconds in parentheses in the returned value
+     * @return a human-readable String that is a formatted representation of the given number of nanoseconds.
+     */
+    public static String formatNanos(final long nanos, final boolean includeTotalNanos) {
+        final StringBuilder sb = new StringBuilder();
+
+        final long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L;
+        long millis = nanos > 1000000L ? nanos / 1000000L : 0L;
+        final long nanosLeft = nanos % 1000000L;
+
+        if (seconds > 0) {
+            sb.append(seconds).append(" seconds");
+        }
+        if (millis > 0) {
+            if (seconds > 0) {
+                sb.append(", ");
+                millis -= seconds * 1000L;
+            }
+
+            sb.append(millis).append(" millis");
+        }
+        if (seconds > 0 || millis > 0) {
+            sb.append(", ");
+        }
+        sb.append(nanosLeft).append(" nanos");
+
+        if (includeTotalNanos) {
+            sb.append(" (").append(nanos).append(" nanos)");
+        }
+
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/CountSizeEntityAccess.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/CountSizeEntityAccess.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/CountSizeEntityAccess.java
new file mode 100644
index 0000000..ddbf29b
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/CountSizeEntityAccess.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.util.timebuffer;
+
+public class CountSizeEntityAccess implements EntityAccess<TimedCountSize> {
+    @Override
+    public TimedCountSize aggregate(final TimedCountSize oldValue, final TimedCountSize toAdd) {
+        if (oldValue == null && toAdd == null) {
+            return new TimedCountSize(0L, 0L);
+        } else if (oldValue == null) {
+            return toAdd;
+        } else if (toAdd == null) {
+            return oldValue;
+        }
+
+        return new TimedCountSize(oldValue.getCount() + toAdd.getCount(), oldValue.getSize() + toAdd.getSize());
+    }
+
+    @Override
+    public TimedCountSize createNew() {
+        return new TimedCountSize(0L, 0L);
+    }
+
+    @Override
+    public long getTimestamp(final TimedCountSize entity) {
+        return entity == null ? 0L : entity.getTimestamp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedCountSize.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedCountSize.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedCountSize.java
new file mode 100644
index 0000000..f1df707
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedCountSize.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.util.timebuffer;
+
+public class TimedCountSize {
+    private final long count;
+    private final long size;
+    private final long timestamp = System.currentTimeMillis();
+
+    public TimedCountSize(final long count, final long size) {
+        this.count = count;
+        this.size = size;
+    }
+
+    public long getCount() {
+        return count;
+    }
+
+    public long getSize() {
+        return size;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index f20f917..12e2b10 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -80,7 +80,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
     private final Path partialPath;
     private final Path snapshotPath;
 
-    private final SerDe<T> serde;
+    private final SerDeFactory<T> serdeFactory;
     private final SyncListener syncListener;
     private final FileChannel lockChannel;
     private final AtomicLong transactionIdGenerator = new AtomicLong(0L);
@@ -105,7 +105,15 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
     private volatile boolean recovered = false;
 
     public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
-        this(new TreeSet<>(Collections.singleton(path)), partitionCount, serde, syncListener);
+        this(new TreeSet<>(Collections.singleton(path)), partitionCount, new SingletonSerDeFactory<T>(serde), syncListener);
+    }
+
+    public MinimalLockingWriteAheadLog(final Path path, final int partitionCount, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException {
+        this(new TreeSet<>(Collections.singleton(path)), partitionCount, serdeFactory, syncListener);
+    }
+
+    public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
+        this(paths, partitionCount, new SingletonSerDeFactory<T>(serde), syncListener);
     }
 
     /**
@@ -116,16 +124,16 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
      * @param partitionCount the number of partitions/journals to use. For best
      * performance, this should be close to the number of threads that are
      * expected to update the repository simultaneously
-     * @param serde the serializer/deserializer for records
+     * @param serdeFactory the factory for the serializer/deserializer for records
      * @param syncListener the listener
      * @throws IOException if unable to initialize due to IO issue
      */
     @SuppressWarnings("unchecked")
-    public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException {
+    public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDeFactory<T> serdeFactory, final SyncListener syncListener) throws IOException {
         this.syncListener = syncListener;
 
         requireNonNull(paths);
-        requireNonNull(serde);
+        requireNonNull(serdeFactory);
 
         if (paths.isEmpty()) {
             throw new IllegalArgumentException("Paths must be non-empty");
@@ -172,7 +180,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
         this.basePath = paths.iterator().next();
         this.partialPath = basePath.resolve("snapshot.partial");
         this.snapshotPath = basePath.resolve("snapshot");
-        this.serde = serde;
+        this.serdeFactory = serdeFactory;
 
         final Path lockPath = basePath.resolve("wali.lock");
         lockChannel = new FileOutputStream(lockPath.toFile()).getChannel();
@@ -189,7 +197,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
 
             final Path partitionBasePath = pathIterator.next();
 
-            partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, getVersion());
+            partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serdeFactory, i, getVersion());
         }
     }
 
@@ -242,13 +250,13 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
                     }
 
                     for (final T record : records) {
-                        final UpdateType updateType = serde.getUpdateType(record);
-                        final Object recordIdentifier = serde.getRecordIdentifier(record);
+                        final UpdateType updateType = serdeFactory.getUpdateType(record);
+                        final Object recordIdentifier = serdeFactory.getRecordIdentifier(record);
 
                         if (updateType == UpdateType.DELETE) {
                             recordMap.remove(recordIdentifier);
                         } else if (updateType == UpdateType.SWAP_OUT) {
-                            final String newLocation = serde.getLocation(record);
+                            final String newLocation = serdeFactory.getLocation(record);
                             if (newLocation == null) {
                                 logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_OUT but "
                                         + "no indicator of where the Record is to be Swapped Out to; these records may be "
@@ -258,7 +266,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
                                 this.externalLocations.add(newLocation);
                             }
                         } else if (updateType == UpdateType.SWAP_IN) {
-                            final String newLocation = serde.getLocation(record);
+                            final String newLocation = serdeFactory.getLocation(record);
                             if (newLocation == null) {
                                 logger.error("Received Record (ID=" + recordIdentifier + ") with UpdateType of SWAP_IN but no "
                                         + "indicator of where the Record is to be Swapped In from; these records may be duplicated "
@@ -360,11 +368,14 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
                         + waliImplementationVersion + " of the " + waliImplementationClass + " class; cannot restore using Version " + getVersion());
             }
 
-            dataIn.readUTF(); // ignore serde class name for now
+            final String serdeEncoding = dataIn.readUTF(); // ignore serde class name for now
             final int serdeVersion = dataIn.readInt();
             final long maxTransactionId = dataIn.readLong();
             final int numRecords = dataIn.readInt();
 
+            final SerDe<T> serde = serdeFactory.createSerDe(serdeEncoding);
+            serde.readHeader(dataIn);
+
             for (int i = 0; i < numRecords; i++) {
                 final T record = serde.deserializeRecord(dataIn, serdeVersion);
                 if (record == null) {
@@ -491,6 +502,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
         long stopTheWorldNanos = -1L;
         long stopTheWorldStart = -1L;
         try {
+            final List<OutputStream> partitionStreams = new ArrayList<>();
+
             writeLock.lock();
             try {
                 stopTheWorldStart = System.nanoTime();
@@ -512,25 +525,48 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
                 swapLocations = new HashSet<>(externalLocations);
                 for (final Partition<T> partition : partitions) {
                     try {
-                        partition.rollover();
+                        partitionStreams.add(partition.rollover());
                     } catch (final Throwable t) {
                         partition.blackList();
                         numberBlackListedPartitions.getAndIncrement();
                         throw t;
                     }
                 }
-
-                // notify global sync with the write lock held. We do this because we don't want the repository to get updated
-                // while the listener is performing its necessary tasks
-                if (syncListener != null) {
-                    syncListener.onGlobalSync();
-                }
             } finally {
                 writeLock.unlock();
             }
 
             stopTheWorldNanos = System.nanoTime() - stopTheWorldStart;
 
+            // Close all of the Partitions' Output Streams. We do this here, instead of in Partition.rollover()
+            // because we want to do this outside of the write lock. Because calling close() on FileOutputStream can
+            // be very expensive, as it has to flush the data to disk, we don't want to prevent other Process Sessions
+            // from getting committed. Since rollover() transitions the partition to write to a new file already, there
+            // is no reason that we need to close this FileOutputStream before releasing the write lock. Also, if any Exception
+            // does get thrown when calling close(), we don't need to blacklist the partition, as the stream that was getting
+            // closed is not the stream being written to for the partition anyway. We also catch any IOException and wait until
+            // after we've attempted to close all streams before we throw an Exception, to avoid resource leaks if one of them
+            // is unable to be closed (due to out of storage space, for instance).
+            IOException failure = null;
+            for (final OutputStream partitionStream : partitionStreams) {
+                try {
+                    partitionStream.close();
+                } catch (final IOException e) {
+                    failure = e;
+                }
+            }
+            if (failure != null) {
+                throw failure;
+            }
+
+            // notify global sync with the write lock held. We do this because we don't want the repository to get updated
+            // while the listener is performing its necessary tasks
+            if (syncListener != null) {
+                syncListener.onGlobalSync();
+            }
+
+            final SerDe<T> serde = serdeFactory.createSerDe(null);
+
             // perform checkpoint, writing to .partial file
             fileOut = new FileOutputStream(partialPath.toFile());
             dataOut = new DataOutputStream(fileOut);
@@ -540,6 +576,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
             dataOut.writeInt(serde.getVersion());
             dataOut.writeLong(maxTransactionId);
             dataOut.writeInt(records.size());
+            serde.writeHeader(dataOut);
 
             for (final T record : records) {
                 logger.trace("Checkpointing {}", record);
@@ -627,7 +664,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
         public static final String JOURNAL_EXTENSION = ".journal";
         private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal");
 
-        private final SerDe<S> serde;
+        private final SerDeFactory<S> serdeFactory;
+        private SerDe<S> serde;
 
         private final Path editDirectory;
         private final int writeAheadLogVersion;
@@ -650,9 +688,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
 
         private final Queue<Path> recoveryFiles;
 
-        public Partition(final Path path, final SerDe<S> serde, final int partitionIndex, final int writeAheadLogVersion) throws IOException {
+        public Partition(final Path path, final SerDeFactory<S> serdeFactory, final int partitionIndex, final int writeAheadLogVersion) throws IOException {
             this.editDirectory = path;
-            this.serde = serde;
+            this.serdeFactory = serdeFactory;
 
             final File file = path.toFile();
             if (!file.exists() && !file.mkdirs()) {
@@ -744,24 +782,16 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
          *
          * @throws IOException if failure to rollover
          */
-        public void rollover() throws IOException {
+        public OutputStream rollover() throws IOException {
             lock.lock();
             try {
                 // Note that here we are closing fileOut and NOT dataOut. See the note in the close()
                 // method to understand the logic behind this.
-                final OutputStream out = fileOut;
-                if (out != null) {
-                    try {
-                        out.close();
-                    } catch (final IOException ioe) {
-                        dataOut = null;
-                        fileOut = null;
-
-                        blackList();
-                        throw ioe;
-                    }
-                }
+                final OutputStream oldOutputStream = fileOut;
+                dataOut = null;
+                fileOut = null;
 
+                this.serde = serdeFactory.createSerDe(null);
                 final Path editPath = getNewEditPath();
                 final FileOutputStream fos = new FileOutputStream(editPath.toFile());
                 try {
@@ -770,10 +800,18 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
                     outStream.writeInt(writeAheadLogVersion);
                     outStream.writeUTF(serde.getClass().getName());
                     outStream.writeInt(serde.getVersion());
+                    serde.writeHeader(outStream);
+
                     outStream.flush();
                     dataOut = outStream;
                     fileOut = fos;
                 } catch (final IOException ioe) {
+                    try {
+                        oldOutputStream.close();
+                    } catch (final IOException ioe2) {
+                        ioe.addSuppressed(ioe2);
+                    }
+
                     logger.error("Failed to create new journal for {} due to {}", new Object[] {this, ioe.toString()}, ioe);
                     try {
                         fos.close();
@@ -790,6 +828,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
                 currentJournalFilename = editPath.toFile().getName();
 
                 blackListed = false;
+                return oldOutputStream;
             } finally {
                 lock.unlock();
             }
@@ -959,9 +998,11 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
                                 + "WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion);
                     }
 
-                    @SuppressWarnings("unused")
-                    final String serdeClassName = recoveryIn.readUTF();
+                    final String serdeEncoding = recoveryIn.readUTF();
                     this.recoveryVersion = recoveryIn.readInt();
+                    serde = serdeFactory.createSerDe(serdeEncoding);
+
+                    serde.readHeader(recoveryIn);
 
                     break;
                 }
@@ -1009,12 +1050,15 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
 
             final Path newEditPath = getNewEditPath();
 
+            this.serde = serdeFactory.createSerDe(null);
             final FileOutputStream fos = new FileOutputStream(newEditPath.toFile());
             final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos));
             outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName());
             outStream.writeInt(writeAheadLogVersion);
             outStream.writeUTF(serde.getClass().getName());
             outStream.writeInt(serde.getVersion());
+            serde.writeHeader(outStream);
+
             outStream.flush();
             dataOut = outStream;
             fileOut = fos;

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
index cc984a6..d1919e7 100644
--- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDe.java
@@ -30,6 +30,15 @@ import java.util.Map;
 public interface SerDe<T> {
 
     /**
+     * Provides the SerDe a chance to write header information to the given output stream
+     *
+     * @param out the DataOutputStream to write to
+     * @throws IOException if unable to write to the OutputStream
+     */
+    default void writeHeader(DataOutputStream out) throws IOException {
+    }
+
+    /**
      * <p>
      * Serializes an Edit Record to the log via the given
      * {@link DataOutputStream}.
@@ -55,6 +64,15 @@ public interface SerDe<T> {
     void serializeRecord(T record, DataOutputStream out) throws IOException;
 
     /**
+     * Provides the SerDe the opportunity to read header information before deserializing any records
+     *
+     * @param in the InputStream to read from
+     * @throws IOException if unable to read from the InputStream
+     */
+    default void readHeader(DataInputStream in) throws IOException {
+    }
+
+    /**
      * <p>
      * Reads an Edit Record from the given {@link DataInputStream} and merges
      * that edit with the current version of the record, returning the new,
@@ -65,9 +83,9 @@ public interface SerDe<T> {
      *
      * @param in to deserialize from
      * @param currentRecordStates an unmodifiable map of Record ID's to the
-     * current state of that record
+     *            current state of that record
      * @param version the version of the SerDe that was used to serialize the
-     * edit record
+     *            edit record
      * @return deserialized record
      * @throws IOException if failure reading
      */
@@ -125,4 +143,12 @@ public interface SerDe<T> {
      * @return version
      */
     int getVersion();
+
+    /**
+     * Closes any resources that the SerDe is holding open
+     *
+     * @throws IOException if unable to close resources
+     */
+    default void close() throws IOException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDeFactory.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDeFactory.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDeFactory.java
new file mode 100644
index 0000000..09e6f7b
--- /dev/null
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SerDeFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.wali;
+
+public interface SerDeFactory<T> {
+
+    /**
+     * Returns a new SerDe
+     *
+     * @param encodingName the name of encoding that was used when writing the serialized data, or <code>null</code> if
+     *            the SerDe is to be used for serialization purposes
+     * @return a SerDe
+     */
+    SerDe<T> createSerDe(String encodingName);
+
+    /**
+     * Returns the unique ID for the given record
+     *
+     * @param record to obtain identifier for
+     * @return identifier of record
+     */
+    Object getRecordIdentifier(T record);
+
+    /**
+     * Returns the UpdateType for the given record
+     *
+     * @param record to retrieve update type for
+     * @return update type
+     */
+    UpdateType getUpdateType(T record);
+
+    /**
+     * Returns the external location of the given record; this is used when a
+     * record is moved away from WALI or is being re-introduced to WALI. For
+     * example, WALI can be updated with a record of type
+     * {@link UpdateType#SWAP_OUT} that indicates a Location of
+     * file://tmp/external1 and can then be re-introduced to WALI by updating
+     * WALI with a record of type {@link UpdateType#CREATE} that indicates a
+     * Location of file://tmp/external1
+     *
+     * @param record to get location of
+     * @return location
+     */
+    String getLocation(T record);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SingletonSerDeFactory.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SingletonSerDeFactory.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SingletonSerDeFactory.java
new file mode 100644
index 0000000..403f082
--- /dev/null
+++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/SingletonSerDeFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.wali;
+
+public class SingletonSerDeFactory<T> implements SerDeFactory<T> {
+    private final SerDe<T> serde;
+
+    public SingletonSerDeFactory(final SerDe<T> serde) {
+        this.serde = serde;
+    }
+
+    @Override
+    public SerDe<T> createSerDe(final String encodingName) {
+        return serde;
+    }
+
+    @Override
+    public Object getRecordIdentifier(final T record) {
+        return serde.getRecordIdentifier(record);
+    }
+
+    @Override
+    public UpdateType getUpdateType(final T record) {
+        return serde.getUpdateType(record);
+    }
+
+    @Override
+    public String getLocation(final T record) {
+        return serde.getLocation(record);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml
index 558110a..27810cd 100644
--- a/nifi-commons/pom.xml
+++ b/nifi-commons/pom.xml
@@ -38,5 +38,6 @@
 		<module>nifi-site-to-site-client</module>
 		<module>nifi-hl7-query-language</module>
         <module>nifi-hadoop-utils</module>
+        <module>nifi-schema-utils</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
index bb78896..7e4495b 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
@@ -64,4 +64,28 @@ public interface ResourceClaim extends Comparable<ResourceClaim> {
      * @return <code>true</code> if the Resource Claim is in use, <code>false</code> otherwise
      */
     boolean isInUse();
+
+
+    /**
+     * Provides the natural ordering for ResourceClaim objects. By default they are sorted by their id, then container, then section
+     *
+     * @param other other claim
+     * @return x such that x <= -1 if this is less than other;
+     *         x=0 if this.equals(other);
+     *         x >= 1 if this is greater than other
+     */
+    @Override
+    default int compareTo(final ResourceClaim other) {
+        final int idComparison = getId().compareTo(other.getId());
+        if (idComparison != 0) {
+            return idComparison;
+        }
+
+        final int containerComparison = getContainer().compareTo(other.getContainer());
+        if (containerComparison != 0) {
+            return containerComparison;
+        }
+
+        return getSection().compareTo(other.getSection());
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index fdb5a83..7ddae36 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -49,6 +49,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-nar-utils</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 350cceb..208bbce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller;
 
 import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -28,34 +29,31 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.regex.Pattern;
+
 import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.IncompleteSwapFileException;
-import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.SwapContents;
 import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
 import org.apache.nifi.controller.repository.SwapSummary;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.swap.StandardSwapContents;
-import org.apache.nifi.controller.swap.StandardSwapSummary;
+import org.apache.nifi.controller.swap.SchemaSwapDeserializer;
+import org.apache.nifi.controller.swap.SchemaSwapSerializer;
+import org.apache.nifi.controller.swap.SimpleSwapDeserializer;
+import org.apache.nifi.controller.swap.SwapDeserializer;
+import org.apache.nifi.controller.swap.SwapSerializer;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,6 +81,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     private EventReporter eventReporter;
     private ResourceClaimManager claimManager;
 
+    private static final byte[] MAGIC_HEADER = {'S', 'W', 'A', 'P'};
+
     /**
      * Default no args constructor for service loading only.
      */
@@ -116,8 +116,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
         final String swapLocation = swapFile.getAbsolutePath();
 
-        try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
-            serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+        final SwapSerializer serializer = new SchemaSwapSerializer();
+        try (final FileOutputStream fos = new FileOutputStream(swapTempFile);
+            final OutputStream out = new BufferedOutputStream(fos)) {
+            out.write(MAGIC_HEADER);
+            final DataOutputStream dos = new DataOutputStream(out);
+            dos.writeUTF(serializer.getSerializationName());
+
+            serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out);
             fos.getFD().sync();
         } catch (final IOException ioe) {
             // we failed to write out the entire swap file. Delete the temporary file, if we can.
@@ -154,14 +160,13 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             throw new FileNotFoundException("Failed to swap in FlowFiles from external storage location " + swapLocation + " into FlowFile Queue because the file could not be found");
         }
 
-        final SwapContents swapContents;
         try (final InputStream fis = new FileInputStream(swapFile);
                 final InputStream bis = new BufferedInputStream(fis);
                 final DataInputStream in = new DataInputStream(bis)) {
-            swapContents = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager);
-        }
 
-        return swapContents;
+            final SwapDeserializer deserializer = createSwapDeserializer(in);
+            return deserializer.deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager);
+        }
     }
 
     @Override
@@ -210,11 +215,13 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             // "<timestamp>-<queue identifier>-<random uuid>.swap". If we have two dashes, then we can just check if the queue ID is equal
             // to the id of the queue given and if not we can just move on.
             final String[] splits = swapFile.getName().split("-");
-            if (splits.length == 3) {
-                final String queueIdentifier = splits[1];
-                if (!queueIdentifier.equals(flowFileQueue.getIdentifier())) {
-                    continue;
+            if (splits.length > 6) {
+                final String queueIdentifier = splits[1] + "-" + splits[2] + "-" + splits[3] + "-" + splits[4] + "-" + splits[5];
+                if (queueIdentifier.equals(flowFileQueue.getIdentifier())) {
+                    swapLocations.add(swapFile.getAbsolutePath());
                 }
+
+                continue;
             }
 
             // Read the queue identifier from the swap file to check if the swap file is for this queue
@@ -222,18 +229,22 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                     final InputStream bufferedIn = new BufferedInputStream(fis);
                     final DataInputStream in = new DataInputStream(bufferedIn)) {
 
-                final int swapEncodingVersion = in.readInt();
-                if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
-                    final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
-                            + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
-
+                final SwapDeserializer deserializer;
+                try {
+                    deserializer = createSwapDeserializer(in);
+                } catch (final Exception e) {
+                    final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " due to " + e;
                     eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
                     throw new IOException(errMsg);
                 }
 
-                final String connectionId = in.readUTF();
-                if (connectionId.equals(flowFileQueue.getIdentifier())) {
-                    swapLocations.add(swapFile.getAbsolutePath());
+                // If deserializer is not an instance of Simple Swap Deserializer, then it means that the serializer is new enough that
+                // we use the 3-element filename as illustrated above, so this is only necessary for the SimpleSwapDeserializer.
+                if (deserializer instanceof SimpleSwapDeserializer) {
+                    final String connectionId = in.readUTF();
+                    if (connectionId.equals(flowFileQueue.getIdentifier())) {
+                        swapLocations.add(swapFile.getAbsolutePath());
+                    }
                 }
             }
         }
@@ -251,353 +262,36 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                 final InputStream bufferedIn = new BufferedInputStream(fis);
                 final DataInputStream in = new DataInputStream(bufferedIn)) {
 
-            final int swapEncodingVersion = in.readInt();
-            if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
-                final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
-                        + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
-
-                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
-                throw new IOException(errMsg);
-            }
-
-            final int numRecords;
-            final long contentSize;
-            Long maxRecordId = null;
-            try {
-                in.readUTF(); // ignore Connection ID
-                numRecords = in.readInt();
-                contentSize = in.readLong();
-
-                if (numRecords == 0) {
-                    return StandardSwapSummary.EMPTY_SUMMARY;
-                }
-
-                if (swapEncodingVersion > 7) {
-                    maxRecordId = in.readLong();
-                }
-            } catch (final EOFException eof) {
-                logger.warn("Found premature End-of-File when reading Swap File {}. EOF occurred before any FlowFiles were encountered", swapLocation);
-                return StandardSwapSummary.EMPTY_SUMMARY;
-            }
-
-            final QueueSize queueSize = new QueueSize(numRecords, contentSize);
-            final SwapContents swapContents = deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, true, claimManager, swapLocation);
-            return swapContents.getSummary();
-        }
-    }
-
-    public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
-        if (toSwap == null || toSwap.isEmpty()) {
-            return 0;
-        }
-
-        long contentSize = 0L;
-        for (final FlowFileRecord record : toSwap) {
-            contentSize += record.getSize();
-        }
-
-        // persist record to disk via the swap file
-        final OutputStream bufferedOut = new BufferedOutputStream(destination);
-        final DataOutputStream out = new DataOutputStream(bufferedOut);
-        try {
-            out.writeInt(SWAP_ENCODING_VERSION);
-            out.writeUTF(queue.getIdentifier());
-            out.writeInt(toSwap.size());
-            out.writeLong(contentSize);
-
-            // get the max record id and write that out so that we know it quickly for restoration
-            long maxRecordId = 0L;
-            for (final FlowFileRecord flowFile : toSwap) {
-                if (flowFile.getId() > maxRecordId) {
-                    maxRecordId = flowFile.getId();
-                }
-            }
-
-            out.writeLong(maxRecordId);
-
-            for (final FlowFileRecord flowFile : toSwap) {
-                out.writeLong(flowFile.getId());
-                out.writeLong(flowFile.getEntryDate());
-                out.writeLong(flowFile.getLineageStartDate());
-                out.writeLong(flowFile.getLineageStartIndex());
-                out.writeLong(flowFile.getLastQueueDate());
-                out.writeLong(flowFile.getQueueDateIndex());
-                out.writeLong(flowFile.getSize());
-
-                final ContentClaim claim = flowFile.getContentClaim();
-                if (claim == null) {
-                    out.writeBoolean(false);
-                } else {
-                    out.writeBoolean(true);
-                    final ResourceClaim resourceClaim = claim.getResourceClaim();
-                    out.writeUTF(resourceClaim.getId());
-                    out.writeUTF(resourceClaim.getContainer());
-                    out.writeUTF(resourceClaim.getSection());
-                    out.writeLong(claim.getOffset());
-                    out.writeLong(claim.getLength());
-                    out.writeLong(flowFile.getContentClaimOffset());
-                    out.writeBoolean(resourceClaim.isLossTolerant());
-                }
-
-                final Map<String, String> attributes = flowFile.getAttributes();
-                out.writeInt(attributes.size());
-                for (final Map.Entry<String, String> entry : attributes.entrySet()) {
-                    writeString(entry.getKey(), out);
-                    writeString(entry.getValue(), out);
-                }
-            }
-        } finally {
-            out.flush();
+            final SwapDeserializer deserializer = createSwapDeserializer(in);
+            return deserializer.getSwapSummary(in, swapLocation, claimManager);
         }
-
-        logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", toSwap.size(), queue, swapLocation);
-
-        return toSwap.size();
     }
 
-    private static void writeString(final String toWrite, final OutputStream out) throws IOException {
-        final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
-        final int utflen = bytes.length;
 
-        if (utflen < 65535) {
-            out.write(utflen >>> 8);
-            out.write(utflen);
-            out.write(bytes);
-        } else {
-            out.write(255);
-            out.write(255);
-            out.write(utflen >>> 24);
-            out.write(utflen >>> 16);
-            out.write(utflen >>> 8);
-            out.write(utflen);
-            out.write(bytes);
-        }
-    }
-
-    static SwapContents deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
-        final int swapEncodingVersion = in.readInt();
-        if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
-            throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
-                    + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
-        }
+    private SwapDeserializer createSwapDeserializer(final DataInputStream dis) throws IOException {
+        dis.mark(MAGIC_HEADER.length);
 
-        final String connectionId = in.readUTF(); // Connection ID
-        if (!connectionId.equals(queue.getIdentifier())) {
-            throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation
-                    + " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier());
-        }
-
-        int numRecords = 0;
-        long contentSize = 0L;
-        Long maxRecordId = null;
+        final byte[] magicHeader = new byte[MAGIC_HEADER.length];
         try {
-            numRecords = in.readInt();
-            contentSize = in.readLong(); // Content Size
-            if (swapEncodingVersion > 7) {
-                maxRecordId = in.readLong(); // Max Record ID
-            }
+            StreamUtils.fillBuffer(dis, magicHeader);
         } catch (final EOFException eof) {
-            final QueueSize queueSize = new QueueSize(numRecords, contentSize);
-            final SwapSummary summary = new StandardSwapSummary(queueSize, maxRecordId, Collections.emptyList());
-            final SwapContents partialContents = new StandardSwapContents(summary, Collections.emptyList());
-            throw new IncompleteSwapFileException(swapLocation, partialContents);
+            throw new IOException("Failed to read swap file because the file contained less than 4 bytes of data");
         }
 
-        final QueueSize queueSize = new QueueSize(numRecords, contentSize);
-        return deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, false, claimManager, swapLocation);
-    }
-
-    private static SwapContents deserializeFlowFiles(final DataInputStream in, final QueueSize queueSize, final Long maxRecordId,
-            final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager, final String location) throws IOException {
-        final List<FlowFileRecord> flowFiles = new ArrayList<>(queueSize.getObjectCount());
-        final List<ResourceClaim> resourceClaims = new ArrayList<>(queueSize.getObjectCount());
-        Long maxId = maxRecordId;
-
-        for (int i = 0; i < queueSize.getObjectCount(); i++) {
-            try {
-                // legacy encoding had an "action" because it used to be couple with FlowFile Repository code
-                if (serializationVersion < 3) {
-                    final int action = in.read();
-                    if (action != 1) {
-                        throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type");
-                    }
-                }
-
-                final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
-                final long recordId = in.readLong();
-                if (maxId == null || recordId > maxId) {
-                    maxId = recordId;
-                }
-
-                ffBuilder.id(recordId);
-                ffBuilder.entryDate(in.readLong());
-
-                if (serializationVersion > 1) {
-                    // Lineage information was added in version 2
-                    if (serializationVersion < 10) {
-                        final int numLineageIdentifiers = in.readInt();
-                        for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) {
-                            in.readUTF(); //skip each identifier
-                        }
-                    }
-
-                    // version 9 adds in a 'lineage start index'
-                    final long lineageStartDate = in.readLong();
-                    final long lineageStartIndex;
-                    if (serializationVersion > 8) {
-                        lineageStartIndex = in.readLong();
-                    } else {
-                        lineageStartIndex = 0L;
-                    }
-
-                    ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
-                    if (serializationVersion > 5) {
-                        // Version 9 adds in a 'queue date index'
-                        final long lastQueueDate = in.readLong();
-                        final long queueDateIndex;
-                        if (serializationVersion > 8) {
-                            queueDateIndex = in.readLong();
-                        } else {
-                            queueDateIndex = 0L;
-                        }
-
-                        ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
-                    }
-                }
-
-                ffBuilder.size(in.readLong());
-
-                if (serializationVersion < 3) {
-                    readString(in); // connection Id
-                }
-
-                final boolean hasClaim = in.readBoolean();
-                ResourceClaim resourceClaim = null;
-                if (hasClaim) {
-                    final String claimId;
-                    if (serializationVersion < 5) {
-                        claimId = String.valueOf(in.readLong());
-                    } else {
-                        claimId = in.readUTF();
-                    }
-
-                    final String container = in.readUTF();
-                    final String section = in.readUTF();
-
-                    final long resourceOffset;
-                    final long resourceLength;
-                    if (serializationVersion < 6) {
-                        resourceOffset = 0L;
-                        resourceLength = -1L;
-                    } else {
-                        resourceOffset = in.readLong();
-                        resourceLength = in.readLong();
-                    }
-
-                    final long claimOffset = in.readLong();
-
-                    final boolean lossTolerant;
-                    if (serializationVersion >= 4) {
-                        lossTolerant = in.readBoolean();
-                    } else {
-                        lossTolerant = false;
-                    }
-
-                    resourceClaim = claimManager.getResourceClaim(container, section, claimId);
-                    if (resourceClaim == null) {
-                        logger.error("Swap file indicates that FlowFile was referencing Resource Claim at container={}, section={}, claimId={}, "
-                            + "but this Resource Claim cannot be found! Will create a temporary Resource Claim, but this may affect the framework's "
-                            + "ability to properly clean up this resource", container, section, claimId);
-                        resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, true);
-                    }
-
-                    final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
-                    claim.setLength(resourceLength);
-
-                    if (incrementContentClaims) {
-                        claimManager.incrementClaimantCount(resourceClaim);
-                    }
-
-                    ffBuilder.contentClaim(claim);
-                    ffBuilder.contentClaimOffset(claimOffset);
-                }
-
-                boolean attributesChanged = true;
-                if (serializationVersion < 3) {
-                    attributesChanged = in.readBoolean();
-                }
-
-                if (attributesChanged) {
-                    final int numAttributes = in.readInt();
-                    for (int j = 0; j < numAttributes; j++) {
-                        final String key = readString(in);
-                        final String value = readString(in);
-
-                        ffBuilder.addAttribute(key, value);
-                    }
-                }
-
-                final FlowFileRecord record = ffBuilder.build();
-                if (resourceClaim != null) {
-                    resourceClaims.add(resourceClaim);
-                }
-
-                flowFiles.add(record);
-            } catch (final EOFException eof) {
-                final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
-                final SwapContents partialContents = new StandardSwapContents(swapSummary, flowFiles);
-                throw new IncompleteSwapFileException(location, partialContents);
+        if (Arrays.equals(magicHeader, MAGIC_HEADER)) {
+            final String serializationName = dis.readUTF();
+            if (serializationName.equals(SchemaSwapDeserializer.getSerializationName())) {
+                return new SchemaSwapDeserializer();
             }
-        }
-
-        final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
-        return new StandardSwapContents(swapSummary, flowFiles);
-    }
 
-    private static String readString(final InputStream in) throws IOException {
-        final Integer numBytes = readFieldLength(in);
-        if (numBytes == null) {
-            throw new EOFException();
-        }
-        final byte[] bytes = new byte[numBytes];
-        fillBuffer(in, bytes, numBytes);
-        return new String(bytes, StandardCharsets.UTF_8);
-    }
-
-    private static Integer readFieldLength(final InputStream in) throws IOException {
-        final int firstValue = in.read();
-        final int secondValue = in.read();
-        if (firstValue < 0) {
-            return null;
-        }
-        if (secondValue < 0) {
-            throw new EOFException();
-        }
-        if (firstValue == 0xff && secondValue == 0xff) {
-            final int ch1 = in.read();
-            final int ch2 = in.read();
-            final int ch3 = in.read();
-            final int ch4 = in.read();
-            if ((ch1 | ch2 | ch3 | ch4) < 0) {
-                throw new EOFException();
-            }
-            return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
+            throw new IOException("Cannot find a suitable Deserializer for swap file, written with Serialization Name '" + serializationName + "'");
         } else {
-            return (firstValue << 8) + secondValue;
+            // SimpleSwapDeserializer is old and did not write out a magic header.
+            dis.reset();
+            return new SimpleSwapDeserializer();
         }
     }
 
-    private static void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
-        int bytesRead;
-        int totalBytesRead = 0;
-        while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
-            totalBytesRead += bytesRead;
-        }
-        if (totalBytesRead != length) {
-            throw new EOFException();
-        }
-    }
 
     private void error(final String error) {
         logger.error(error);

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 68af208..e20e250 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -820,6 +820,7 @@ public class StandardFlowFileQueue implements FlowFileQueue {
         long swapByteCount = 0L;
         Long maxId = null;
         List<ResourceClaim> resourceClaims = new ArrayList<>();
+        final long startNanos = System.nanoTime();
 
         writeLock.lock();
         try {
@@ -866,6 +867,11 @@ public class StandardFlowFileQueue implements FlowFileQueue {
             writeLock.unlock("Recover Swap Files");
         }
 
+        if (!swapLocations.isEmpty()) {
+            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            logger.info("Recovered {} swap files for {} in {} millis", swapLocations.size(), this, millis);
+        }
+
         return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
new file mode 100644
index 0000000..44ed62d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public abstract class RepositoryRecordSerde implements SerDe<RepositoryRecord> {
+    private Map<String, FlowFileQueue> flowFileQueueMap = null;
+
+    protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
+        this.flowFileQueueMap = queueMap;
+    }
+
+    protected Map<String, FlowFileQueue> getQueueMap() {
+        return flowFileQueueMap;
+    }
+
+    protected FlowFileQueue getFlowFileQueue(final String queueId) {
+        return flowFileQueueMap.get(queueId);
+    }
+
+    @Override
+    public Long getRecordIdentifier(final RepositoryRecord record) {
+        return record.getCurrent().getId();
+    }
+
+    @Override
+    public UpdateType getUpdateType(final RepositoryRecord record) {
+        switch (record.getType()) {
+            case CONTENTMISSING:
+            case DELETE:
+                return UpdateType.DELETE;
+            case CREATE:
+                return UpdateType.CREATE;
+            case UPDATE:
+                return UpdateType.UPDATE;
+            case SWAP_OUT:
+                return UpdateType.SWAP_OUT;
+            case SWAP_IN:
+                return UpdateType.SWAP_IN;
+        }
+        return null;
+    }
+
+    @Override
+    public String getLocation(final RepositoryRecord record) {
+        return record.getSwapLocation();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
new file mode 100644
index 0000000..c19fa94
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.wali.SerDe;
+import org.wali.SerDeFactory;
+import org.wali.UpdateType;
+
+public class RepositoryRecordSerdeFactory implements SerDeFactory<RepositoryRecord> {
+    private final String LEGACY_SERDE_ENCODING_NAME = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde";
+    private final ResourceClaimManager resourceClaimManager;
+    private Map<String, FlowFileQueue> flowFileQueueMap = null;
+
+    public RepositoryRecordSerdeFactory(final ResourceClaimManager claimManager) {
+        this.resourceClaimManager = claimManager;
+    }
+
+    protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
+        this.flowFileQueueMap = queueMap;
+    }
+
+    protected Map<String, FlowFileQueue> getQueueMap() {
+        return flowFileQueueMap;
+    }
+
+    @Override
+    public SerDe<RepositoryRecord> createSerDe(final String encodingName) {
+        if (encodingName == null || SchemaRepositoryRecordSerde.class.getName().equals(encodingName)) {
+            final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager);
+            serde.setQueueMap(flowFileQueueMap);
+            return serde;
+        }
+
+        if (WriteAheadRepositoryRecordSerde.class.getName().equals(encodingName)
+            || LEGACY_SERDE_ENCODING_NAME.equals(encodingName)) {
+            final WriteAheadRepositoryRecordSerde serde = new WriteAheadRepositoryRecordSerde(resourceClaimManager);
+            serde.setQueueMap(flowFileQueueMap);
+            return serde;
+        }
+
+        throw new IllegalArgumentException("Cannot create Deserializer for Repository Records because the encoding '" + encodingName + "' is not known");
+    }
+
+    protected FlowFileQueue getFlowFileQueue(final String queueId) {
+        return flowFileQueueMap.get(queueId);
+    }
+
+    @Override
+    public Long getRecordIdentifier(final RepositoryRecord record) {
+        return record.getCurrent().getId();
+    }
+
+    @Override
+    public UpdateType getUpdateType(final RepositoryRecord record) {
+        switch (record.getType()) {
+            case CONTENTMISSING:
+            case DELETE:
+                return UpdateType.DELETE;
+            case CREATE:
+                return UpdateType.CREATE;
+            case UPDATE:
+                return UpdateType.UPDATE;
+            case SWAP_OUT:
+                return UpdateType.SWAP_OUT;
+            case SWAP_IN:
+                return UpdateType.SWAP_IN;
+        }
+        return null;
+    }
+
+    @Override
+    public String getLocation(final RepositoryRecord record) {
+        return record.getSwapLocation();
+    }
+
+}