You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/18 19:54:04 UTC

[4/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
new file mode 100644
index 0000000..76c208d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.List;
+
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class ContentClaimFieldMap implements Record {
+    private final ContentClaim contentClaim;
+    private final long contentClaimOffset;
+    private final ResourceClaimFieldMap resourceClaimFieldMap;
+    private final RecordSchema schema;
+
+    public ContentClaimFieldMap(final ContentClaim contentClaim, final long contentClaimOffset, final RecordSchema schema) {
+        this.contentClaim = contentClaim;
+        this.contentClaimOffset = contentClaimOffset;
+        this.schema = schema;
+
+        final List<RecordField> resourceClaimFields = schema.getField(ContentClaimSchema.RESOURCE_CLAIM).getSubFields();
+        final RecordSchema resourceClaimSchema = new RecordSchema(resourceClaimFields);
+        this.resourceClaimFieldMap = new ResourceClaimFieldMap(contentClaim.getResourceClaim(), resourceClaimSchema);
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        switch (fieldName) {
+            case ContentClaimSchema.RESOURCE_CLAIM:
+                return resourceClaimFieldMap;
+            case ContentClaimSchema.CONTENT_CLAIM_LENGTH:
+                return contentClaim.getLength();
+            case ContentClaimSchema.CONTENT_CLAIM_OFFSET:
+                return contentClaimOffset;
+            case ContentClaimSchema.RESOURCE_CLAIM_OFFSET:
+                return contentClaim.getOffset();
+            default:
+                return null;
+        }
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public String toString() {
+        return "ContentClaimFieldMap[" + contentClaim + "]";
+    }
+
+    public static ContentClaim getContentClaim(final Record claimRecord, final ResourceClaimManager resourceClaimManager) {
+        final Record resourceClaimRecord = (Record) claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM);
+        final String container = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER);
+        final String section = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_SECTION);
+        final String identifier = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER);
+        final Boolean lossTolerant = (Boolean) resourceClaimRecord.getFieldValue(ContentClaimSchema.LOSS_TOLERANT);
+
+        final Long length = (Long) claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_LENGTH);
+        final Long resourceOffset = (Long) claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM_OFFSET);
+
+        final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, lossTolerant, false);
+        final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
+        contentClaim.setLength(length);
+
+        return contentClaim;
+    }
+
+    public static Long getContentClaimOffset(final Record claimRecord) {
+        return (Long) claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_OFFSET);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java
new file mode 100644
index 0000000..c55c758
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class ContentClaimSchema {
+
+    // resource claim fields
+    public static final String CLAIM_CONTAINER = "Container";
+    public static final String CLAIM_SECTION = "Section";
+    public static final String CLAIM_IDENTIFIER = "Identifier";
+    public static final String LOSS_TOLERANT = "Loss Tolerant";
+    public static final String RESOURCE_CLAIM = "Resource Claim";
+
+    // content claim fields
+    public static final String RESOURCE_CLAIM_OFFSET = "Resource Claim Offset"; // offset into resource claim where the content claim begins
+    public static final String CONTENT_CLAIM_OFFSET = "Content Claim Offset"; // offset into the content claim where the flowfile begins
+    public static final String CONTENT_CLAIM_LENGTH = "Content Claim Length";
+
+    public static final RecordSchema CONTENT_CLAIM_SCHEMA_V1;
+    public static final RecordSchema RESOURCE_CLAIM_SCHEMA_V1;
+
+    static {
+        final List<RecordField> resourceClaimFields = new ArrayList<>();
+        resourceClaimFields.add(new SimpleRecordField(CLAIM_CONTAINER, FieldType.STRING, Repetition.EXACTLY_ONE));
+        resourceClaimFields.add(new SimpleRecordField(CLAIM_SECTION, FieldType.STRING, Repetition.EXACTLY_ONE));
+        resourceClaimFields.add(new SimpleRecordField(CLAIM_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
+        resourceClaimFields.add(new SimpleRecordField(LOSS_TOLERANT, FieldType.BOOLEAN, Repetition.EXACTLY_ONE));
+        RESOURCE_CLAIM_SCHEMA_V1 = new RecordSchema(Collections.unmodifiableList(resourceClaimFields));
+
+        final List<RecordField> contentClaimFields = new ArrayList<>();
+        contentClaimFields.add(new ComplexRecordField(RESOURCE_CLAIM, Repetition.EXACTLY_ONE, resourceClaimFields));
+        contentClaimFields.add(new SimpleRecordField(RESOURCE_CLAIM_OFFSET, FieldType.LONG, Repetition.EXACTLY_ONE));
+        contentClaimFields.add(new SimpleRecordField(CONTENT_CLAIM_OFFSET, FieldType.LONG, Repetition.EXACTLY_ONE));
+        contentClaimFields.add(new SimpleRecordField(CONTENT_CLAIM_LENGTH, FieldType.LONG, Repetition.EXACTLY_ONE));
+        CONTENT_CLAIM_SCHEMA_V1 = new RecordSchema(Collections.unmodifiableList(contentClaimFields));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java
new file mode 100644
index 0000000..ff0615f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class FlowFileRecordFieldMap implements Record {
+    private final FlowFileRecord flowFile;
+    private final RecordSchema schema;
+    private final RecordSchema contentClaimSchema;
+    private final ContentClaimFieldMap contentClaim;
+
+    public FlowFileRecordFieldMap(final FlowFileRecord flowFile, final RecordSchema schema) {
+        this.flowFile = flowFile;
+        this.schema = schema;
+
+        final RecordField contentClaimField = schema.getField(FlowFileSchema.CONTENT_CLAIM);
+        contentClaimSchema = new RecordSchema(contentClaimField.getSubFields());
+        contentClaim = flowFile.getContentClaim() == null ? null : new ContentClaimFieldMap(flowFile.getContentClaim(), flowFile.getContentClaimOffset(), contentClaimSchema);
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        switch (fieldName) {
+            case FlowFileSchema.ATTRIBUTES:
+                return flowFile.getAttributes();
+            case FlowFileSchema.CONTENT_CLAIM:
+                return contentClaim;
+            case FlowFileSchema.ENTRY_DATE:
+                return flowFile.getEntryDate();
+            case FlowFileSchema.FLOWFILE_SIZE:
+                return flowFile.getSize();
+            case FlowFileSchema.LINEAGE_START_DATE:
+                return flowFile.getLineageStartDate();
+            case FlowFileSchema.LINEAGE_START_INDEX:
+                return flowFile.getLineageStartIndex();
+            case FlowFileSchema.QUEUE_DATE:
+                return flowFile.getLastQueueDate();
+            case FlowFileSchema.QUEUE_DATE_INDEX:
+                return flowFile.getQueueDateIndex();
+            case FlowFileSchema.RECORD_ID:
+                return flowFile.getId();
+        }
+
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static FlowFileRecord getFlowFile(final Record record, final ResourceClaimManager claimManager) {
+        final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder();
+        builder.id((Long) record.getFieldValue(FlowFileSchema.RECORD_ID));
+        builder.entryDate((Long) record.getFieldValue(FlowFileSchema.ENTRY_DATE));
+        builder.size((Long) record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE));
+        builder.addAttributes((Map<String, String>) record.getFieldValue(FlowFileSchema.ATTRIBUTES));
+        builder.lineageStart((Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE), (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX));
+        builder.lastQueued((Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE), (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX));
+
+        final Record contentClaimRecord = (Record) record.getFieldValue(FlowFileSchema.CONTENT_CLAIM);
+        if (contentClaimRecord != null) {
+            final ContentClaim claim = ContentClaimFieldMap.getContentClaim(contentClaimRecord, claimManager);
+            builder.contentClaim(claim);
+
+            final Long offset = ContentClaimFieldMap.getContentClaimOffset(contentClaimRecord);
+            if (offset != null) {
+                builder.contentClaimOffset(offset);
+            }
+        }
+
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
new file mode 100644
index 0000000..53eab70
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.MapRecordField;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class FlowFileSchema {
+
+    public static final String RECORD_ID = "Record ID";
+    public static final String ENTRY_DATE = "Entry Date";
+    public static final String LINEAGE_START_DATE = "Lineage Start Date";
+    public static final String LINEAGE_START_INDEX = "Lineage Start Index";
+    public static final String QUEUE_DATE = "Queued Date";
+    public static final String QUEUE_DATE_INDEX = "Queued Date Index";
+    public static final String FLOWFILE_SIZE = "FlowFile Size";
+    public static final String CONTENT_CLAIM = "Content Claim";
+    public static final String ATTRIBUTES = "Attributes";
+
+    // attribute fields
+    public static final String ATTRIBUTE_NAME = "Attribute Name";
+    public static final String ATTRIBUTE_VALUE = "Attribute Value";
+
+    public static final RecordSchema FLOWFILE_SCHEMA_V1;
+
+    static {
+        final List<RecordField> flowFileFields = new ArrayList<>();
+
+        final RecordField attributeNameField = new SimpleRecordField(ATTRIBUTE_NAME, FieldType.STRING, Repetition.EXACTLY_ONE);
+        final RecordField attributeValueField = new SimpleRecordField(ATTRIBUTE_VALUE, FieldType.STRING, Repetition.EXACTLY_ONE);
+
+        flowFileFields.add(new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(ENTRY_DATE, FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(LINEAGE_START_DATE, FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(LINEAGE_START_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(QUEUE_DATE, FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(QUEUE_DATE_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE));
+        flowFileFields.add(new ComplexRecordField(CONTENT_CLAIM, Repetition.ZERO_OR_ONE, ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1.getFields()));
+        flowFileFields.add(new MapRecordField(ATTRIBUTES, attributeNameField, attributeValueField, Repetition.ZERO_OR_ONE));
+
+        FLOWFILE_SCHEMA_V1 = new RecordSchema(flowFileFields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
new file mode 100644
index 0000000..9804dec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class RepositoryRecordFieldMap implements Record {
+    private final RepositoryRecord record;
+    private final FlowFileRecord flowFile;
+    private final RecordSchema schema;
+    private final RecordSchema contentClaimSchema;
+
+    public RepositoryRecordFieldMap(final RepositoryRecord record, final RecordSchema repoRecordSchema, final RecordSchema contentClaimSchema) {
+        this.schema = repoRecordSchema;
+        this.contentClaimSchema = contentClaimSchema;
+        this.record = record;
+        this.flowFile = record.getCurrent();
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        switch (fieldName) {
+            case RepositoryRecordSchema.ACTION_TYPE:
+                return record.getType().name();
+            case RepositoryRecordSchema.RECORD_ID:
+                return record.getCurrent().getId();
+            case RepositoryRecordSchema.SWAP_LOCATION:
+                return record.getSwapLocation();
+            case FlowFileSchema.ATTRIBUTES:
+                return flowFile.getAttributes();
+            case FlowFileSchema.ENTRY_DATE:
+                return flowFile.getEntryDate();
+            case FlowFileSchema.FLOWFILE_SIZE:
+                return flowFile.getSize();
+            case FlowFileSchema.LINEAGE_START_DATE:
+                return flowFile.getLineageStartDate();
+            case FlowFileSchema.LINEAGE_START_INDEX:
+                return flowFile.getLineageStartIndex();
+            case FlowFileSchema.QUEUE_DATE:
+                return flowFile.getLastQueueDate();
+            case FlowFileSchema.QUEUE_DATE_INDEX:
+                return flowFile.getQueueDateIndex();
+            case FlowFileSchema.CONTENT_CLAIM:
+                final ContentClaimFieldMap contentClaimFieldMap = record.getCurrentClaim() == null ? null
+                    : new ContentClaimFieldMap(record.getCurrentClaim(), record.getCurrentClaimOffset(), contentClaimSchema);
+                return contentClaimFieldMap;
+            case RepositoryRecordSchema.QUEUE_IDENTIFIER:
+                final FlowFileQueue queue = record.getDestination() == null ? record.getOriginalQueue() : record.getDestination();
+                return queue.getIdentifier();
+            default:
+                return null;
+        }
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public String toString() {
+        return "RepositoryRecordFieldMap[" + record + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
new file mode 100644
index 0000000..5887c8a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+import org.apache.nifi.repository.schema.UnionRecordField;
+
+public class RepositoryRecordSchema {
+
+    public static final String REPOSITORY_RECORD_UPDATE_V1 = "Repository Record Update";  // top level field name
+
+    // repository record fields
+    public static final String ACTION_TYPE = "Action";
+    public static final String RECORD_ID = "Record ID";
+    public static final String QUEUE_IDENTIFIER = "Queue Identifier";
+    public static final String SWAP_LOCATION = "Swap Location";
+
+    // Update types
+    public static final String CREATE_OR_UPDATE_ACTION = "Create or Update";
+    public static final String DELETE_ACTION = "Delete";
+    public static final String SWAP_IN_ACTION = "Swap In";
+    public static final String SWAP_OUT_ACTION = "Swap Out";
+
+    public static final RecordSchema REPOSITORY_RECORD_SCHEMA_V1;
+    public static final RecordSchema CREATE_OR_UPDATE_SCHEMA_V1;
+    public static final RecordSchema DELETE_SCHEMA_V1;
+    public static final RecordSchema SWAP_IN_SCHEMA_V1;
+    public static final RecordSchema SWAP_OUT_SCHEMA_V1;
+
+    public static final RecordField ACTION_TYPE_FIELD = new SimpleRecordField(ACTION_TYPE, FieldType.STRING, Repetition.EXACTLY_ONE);
+    public static final RecordField RECORD_ID_FIELD = new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE);
+
+    static {
+        // Fields for "Create" or "Update" records
+        final List<RecordField> createOrUpdateFields = new ArrayList<>();
+        createOrUpdateFields.add(ACTION_TYPE_FIELD);
+        createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields());
+        createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
+        createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.ZERO_OR_ONE));
+        final ComplexRecordField createOrUpdate = new ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, createOrUpdateFields);
+        CREATE_OR_UPDATE_SCHEMA_V1 = new RecordSchema(createOrUpdateFields);
+
+        // Fields for "Delete" records
+        final List<RecordField> deleteFields = new ArrayList<>();
+        deleteFields.add(ACTION_TYPE_FIELD);
+        deleteFields.add(RECORD_ID_FIELD);
+        final ComplexRecordField delete = new ComplexRecordField(DELETE_ACTION, Repetition.EXACTLY_ONE, deleteFields);
+        DELETE_SCHEMA_V1 = new RecordSchema(deleteFields);
+
+        // Fields for "Swap Out" records
+        final List<RecordField> swapOutFields = new ArrayList<>();
+        swapOutFields.add(ACTION_TYPE_FIELD);
+        swapOutFields.add(RECORD_ID_FIELD);
+        swapOutFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
+        swapOutFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
+        final ComplexRecordField swapOut = new ComplexRecordField(SWAP_OUT_ACTION, Repetition.EXACTLY_ONE, swapOutFields);
+        SWAP_OUT_SCHEMA_V1 = new RecordSchema(swapOutFields);
+
+        // Fields for "Swap In" records
+        final List<RecordField> swapInFields = new ArrayList<>(createOrUpdateFields);
+        swapInFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
+        final ComplexRecordField swapIn = new ComplexRecordField(SWAP_IN_ACTION, Repetition.EXACTLY_ONE, swapInFields);
+        SWAP_IN_SCHEMA_V1 = new RecordSchema(swapInFields);
+
+        // Union Field that creates the top-level field type
+        final UnionRecordField repoUpdateField = new UnionRecordField(REPOSITORY_RECORD_UPDATE_V1, Repetition.EXACTLY_ONE, createOrUpdate, delete, swapOut, swapIn);
+        REPOSITORY_RECORD_SCHEMA_V1 = new RecordSchema(Collections.singletonList(repoUpdateField));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
new file mode 100644
index 0000000..ad51f4d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import org.apache.nifi.repository.schema.NamedValue;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.wali.UpdateType;
+
+public class RepositoryRecordUpdate implements Record {
+    private final RecordSchema schema;
+    private final RepositoryRecordFieldMap fieldMap;
+
+    public RepositoryRecordUpdate(final RepositoryRecordFieldMap fieldMap, final RecordSchema schema) {
+        this.schema = schema;
+        this.fieldMap = fieldMap;
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        if (RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V1.equals(fieldName)) {
+            final String actionType = (String) fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE);
+            final UpdateType updateType = UpdateType.valueOf(actionType);
+
+            final String actionName;
+            switch (updateType) {
+                case CREATE:
+                case UPDATE:
+                    actionName = RepositoryRecordSchema.CREATE_OR_UPDATE_ACTION;
+                    break;
+                case DELETE:
+                    actionName = RepositoryRecordSchema.DELETE_ACTION;
+                    break;
+                case SWAP_IN:
+                    actionName = RepositoryRecordSchema.SWAP_IN_ACTION;
+                    break;
+                case SWAP_OUT:
+                    actionName = RepositoryRecordSchema.SWAP_OUT_ACTION;
+                    break;
+                default:
+                    return null;
+            }
+
+            return new NamedValue(actionName, fieldMap);
+        }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
new file mode 100644
index 0000000..afa19ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class ResourceClaimFieldMap implements Record {
+    private final ResourceClaim resourceClaim;
+    private final RecordSchema schema;
+
+    public ResourceClaimFieldMap(final ResourceClaim resourceClaim, final RecordSchema schema) {
+        this.resourceClaim = resourceClaim;
+        this.schema = schema;
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        switch (fieldName) {
+            case ContentClaimSchema.CLAIM_CONTAINER:
+                return resourceClaim.getContainer();
+            case ContentClaimSchema.CLAIM_SECTION:
+                return resourceClaim.getSection();
+            case ContentClaimSchema.CLAIM_IDENTIFIER:
+                return resourceClaim.getId();
+            case ContentClaimSchema.LOSS_TOLERANT:
+                return resourceClaim.isLossTolerant();
+        }
+
+        return null;
+    }
+
+    public static ResourceClaim getResourceClaim(final Record record, final ResourceClaimManager claimManager) {
+        final String container = (String) record.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER);
+        final String section = (String) record.getFieldValue(ContentClaimSchema.CLAIM_SECTION);
+        final String identifier = (String) record.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER);
+        final Boolean lossTolerant = (Boolean) record.getFieldValue(ContentClaimSchema.LOSS_TOLERANT);
+
+        return claimManager.newResourceClaim(container, section, identifier, lossTolerant, false);
+    }
+
+    @Override
+    public int hashCode() {
+        return 41 + 91 * resourceClaim.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+
+        if (obj.getClass() != ResourceClaimFieldMap.class) {
+            return false;
+        }
+
+        final ResourceClaimFieldMap other = (ResourceClaimFieldMap) obj;
+        return resourceClaim.equals(other.resourceClaim);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java
new file mode 100644
index 0000000..88e1415
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.schema.FlowFileRecordFieldMap;
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SchemaRecordReader;
+
+public class SchemaSwapDeserializer implements SwapDeserializer {
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public SwapContents deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
+        final RecordSchema schema = RecordSchema.readFrom(in);
+        final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema);
+
+        final Record parentRecord = reader.readRecord(in);
+        final List<Record> flowFileRecords = (List<Record>) parentRecord.getFieldValue(SwapSchema.FLOWFILE_CONTENTS);
+
+        final List<FlowFileRecord> flowFiles = new ArrayList<>(flowFileRecords.size());
+        for (final Record record : flowFileRecords) {
+            flowFiles.add(FlowFileRecordFieldMap.getFlowFile(record, claimManager));
+        }
+
+        final Record summaryRecord = (Record) parentRecord.getFieldValue(SwapSchema.SWAP_SUMMARY);
+        final SwapSummary swapSummary = SwapSummaryFieldMap.getSwapSummary(summaryRecord, claimManager);
+
+        return new StandardSwapContents(swapSummary, flowFiles);
+    }
+
+    @Override
+    public SwapSummary getSwapSummary(final DataInputStream in, final String swapLocation, final ResourceClaimManager claimManager) throws IOException {
+        final RecordSchema schema = RecordSchema.readFrom(in);
+        final List<RecordField> summaryFields = schema.getField(SwapSchema.SWAP_SUMMARY).getSubFields();
+        final RecordField summaryRecordField = new ComplexRecordField(SwapSchema.SWAP_SUMMARY, Repetition.EXACTLY_ONE, summaryFields);
+        final RecordSchema summarySchema = new RecordSchema(Collections.singletonList(summaryRecordField));
+
+        final Record summaryRecordParent = SchemaRecordReader.fromSchema(summarySchema).readRecord(in);
+        final Record summaryRecord = (Record) summaryRecordParent.getFieldValue(SwapSchema.SWAP_SUMMARY);
+        final SwapSummary swapSummary = SwapSummaryFieldMap.getSwapSummary(summaryRecord, claimManager);
+        return swapSummary;
+    }
+
+    public static String getSerializationName() {
+        return SchemaSwapSerializer.SERIALIZATION_NAME;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java
new file mode 100644
index 0000000..195f55a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.schema.FlowFileRecordFieldMap;
+import org.apache.nifi.controller.repository.schema.FlowFileSchema;
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldMapRecord;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SchemaRecordWriter;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class SchemaSwapSerializer implements SwapSerializer {
+    static final String SERIALIZATION_NAME = "Schema Swap Serialization";
+
+    private final RecordSchema schema = SwapSchema.FULL_SWAP_FILE_SCHEMA_V1;
+    private final RecordSchema flowFileSchema = new RecordSchema(schema.getField(SwapSchema.FLOWFILE_CONTENTS).getSubFields());
+
+    @Override
+    public void serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream out) throws IOException {
+        schema.writeTo(out);
+
+        long contentSize = 0L;
+        long maxFlowFileId = -1L;
+        final List<ResourceClaim> resourceClaims = new ArrayList<>();
+        for (final FlowFileRecord flowFile : toSwap) {
+            contentSize += flowFile.getSize();
+            if (flowFile.getId() > maxFlowFileId) {
+                maxFlowFileId = flowFile.getId();
+            }
+
+            final ContentClaim contentClaim = flowFile.getContentClaim();
+            if (contentClaim != null) {
+                resourceClaims.add(contentClaim.getResourceClaim());
+            }
+        }
+
+        final QueueSize queueSize = new QueueSize(toSwap.size(), contentSize);
+        final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxFlowFileId, resourceClaims);
+        final Record summaryRecord = new SwapSummaryFieldMap(swapSummary, queue.getIdentifier(), SwapSchema.SWAP_SUMMARY_SCHEMA_V1);
+
+        final List<Record> flowFileRecords = toSwap.stream()
+            .map(flowFile -> new FlowFileRecordFieldMap(flowFile, flowFileSchema))
+            .collect(Collectors.toList());
+
+        // Create a simple record to hold the summary and the flowfile contents
+        final RecordField summaryField = new SimpleRecordField(SwapSchema.SWAP_SUMMARY, FieldType.COMPLEX, Repetition.EXACTLY_ONE);
+        final RecordField contentsField = new ComplexRecordField(SwapSchema.FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields());
+        final List<RecordField> fields = new ArrayList<>(2);
+        fields.add(summaryField);
+        fields.add(contentsField);
+
+        final Map<RecordField, Object> swapFileMap = new LinkedHashMap<>();
+        swapFileMap.put(summaryField, summaryRecord);
+        swapFileMap.put(contentsField, flowFileRecords);
+        final Record swapFileRecord = new FieldMapRecord(swapFileMap, new RecordSchema(fields));
+
+        final SchemaRecordWriter writer = new SchemaRecordWriter();
+        writer.writeRecord(swapFileRecord, out);
+        out.flush();
+    }
+
+    @Override
+    public String getSerializationName() {
+        return SERIALIZATION_NAME;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapDeserializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapDeserializer.java
new file mode 100644
index 0000000..b86d9a8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapDeserializer.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.IncompleteSwapFileException;
+import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleSwapDeserializer implements SwapDeserializer {
+    public static final int SWAP_ENCODING_VERSION = 10;
+    private static final Logger logger = LoggerFactory.getLogger(SimpleSwapDeserializer.class);
+
+    @Override
+    public SwapSummary getSwapSummary(final DataInputStream in, final String swapLocation, final ResourceClaimManager claimManager) throws IOException {
+        final int swapEncodingVersion = in.readInt();
+        if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+            final String errMsg = "Cannot swap FlowFiles in from " + swapLocation + " because the encoding version is "
+                + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
+
+            throw new IOException(errMsg);
+        }
+
+        final int numRecords;
+        final long contentSize;
+        Long maxRecordId = null;
+        try {
+            in.readUTF(); // ignore Connection ID
+            numRecords = in.readInt();
+            contentSize = in.readLong();
+
+            if (numRecords == 0) {
+                return StandardSwapSummary.EMPTY_SUMMARY;
+            }
+
+            if (swapEncodingVersion > 7) {
+                maxRecordId = in.readLong();
+            }
+        } catch (final EOFException eof) {
+            logger.warn("Found premature End-of-File when reading Swap File {}. EOF occurred before any FlowFiles were encountered", swapLocation);
+            return StandardSwapSummary.EMPTY_SUMMARY;
+        }
+
+        final QueueSize queueSize = new QueueSize(numRecords, contentSize);
+        final SwapContents swapContents = deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, claimManager, swapLocation);
+        return swapContents.getSummary();
+    }
+
+
+    @Override
+    public SwapContents deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
+        final int swapEncodingVersion = in.readInt();
+        if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+            throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
+                + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
+        }
+
+        final String connectionId = in.readUTF(); // Connection ID
+        if (!connectionId.equals(queue.getIdentifier())) {
+            throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation
+                + " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier());
+        }
+
+        int numRecords = 0;
+        long contentSize = 0L;
+        Long maxRecordId = null;
+        try {
+            numRecords = in.readInt();
+            contentSize = in.readLong(); // Content Size
+            if (swapEncodingVersion > 7) {
+                maxRecordId = in.readLong(); // Max Record ID
+            }
+        } catch (final EOFException eof) {
+            final QueueSize queueSize = new QueueSize(numRecords, contentSize);
+            final SwapSummary summary = new StandardSwapSummary(queueSize, maxRecordId, Collections.emptyList());
+            final SwapContents partialContents = new StandardSwapContents(summary, Collections.emptyList());
+            throw new IncompleteSwapFileException(swapLocation, partialContents);
+        }
+
+        final QueueSize queueSize = new QueueSize(numRecords, contentSize);
+        return deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, claimManager, swapLocation);
+    }
+
+    private static SwapContents deserializeFlowFiles(final DataInputStream in, final QueueSize queueSize, final Long maxRecordId,
+        final int serializationVersion, final ResourceClaimManager claimManager, final String location) throws IOException {
+        final List<FlowFileRecord> flowFiles = new ArrayList<>(queueSize.getObjectCount());
+        final List<ResourceClaim> resourceClaims = new ArrayList<>(queueSize.getObjectCount());
+        Long maxId = maxRecordId;
+
+        for (int i = 0; i < queueSize.getObjectCount(); i++) {
+            try {
+                // legacy encoding had an "action" because it used to be couple with FlowFile Repository code
+                if (serializationVersion < 3) {
+                    final int action = in.read();
+                    if (action != 1) {
+                        throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type");
+                    }
+                }
+
+                final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+                final long recordId = in.readLong();
+                if (maxId == null || recordId > maxId) {
+                    maxId = recordId;
+                }
+
+                ffBuilder.id(recordId);
+                ffBuilder.entryDate(in.readLong());
+
+                if (serializationVersion > 1) {
+                    // Lineage information was added in version 2
+                    if (serializationVersion < 10) {
+                        final int numLineageIdentifiers = in.readInt();
+                        for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) {
+                            in.readUTF(); //skip each identifier
+                        }
+                    }
+
+                    // version 9 adds in a 'lineage start index'
+                    final long lineageStartDate = in.readLong();
+                    final long lineageStartIndex;
+                    if (serializationVersion > 8) {
+                        lineageStartIndex = in.readLong();
+                    } else {
+                        lineageStartIndex = 0L;
+                    }
+
+                    ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
+
+                    if (serializationVersion > 5) {
+                        // Version 9 adds in a 'queue date index'
+                        final long lastQueueDate = in.readLong();
+                        final long queueDateIndex;
+                        if (serializationVersion > 8) {
+                            queueDateIndex = in.readLong();
+                        } else {
+                            queueDateIndex = 0L;
+                        }
+
+                        ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
+                    }
+                }
+
+                ffBuilder.size(in.readLong());
+
+                if (serializationVersion < 3) {
+                    readString(in); // connection Id
+                }
+
+                final boolean hasClaim = in.readBoolean();
+                ResourceClaim resourceClaim = null;
+                if (hasClaim) {
+                    final String claimId;
+                    if (serializationVersion < 5) {
+                        claimId = String.valueOf(in.readLong());
+                    } else {
+                        claimId = in.readUTF();
+                    }
+
+                    final String container = in.readUTF();
+                    final String section = in.readUTF();
+
+                    final long resourceOffset;
+                    final long resourceLength;
+                    if (serializationVersion < 6) {
+                        resourceOffset = 0L;
+                        resourceLength = -1L;
+                    } else {
+                        resourceOffset = in.readLong();
+                        resourceLength = in.readLong();
+                    }
+
+                    final long claimOffset = in.readLong();
+
+                    final boolean lossTolerant;
+                    if (serializationVersion >= 4) {
+                        lossTolerant = in.readBoolean();
+                    } else {
+                        lossTolerant = false;
+                    }
+
+                    resourceClaim = claimManager.getResourceClaim(container, section, claimId);
+                    if (resourceClaim == null) {
+                        logger.error("Swap file indicates that FlowFile was referencing Resource Claim at container={}, section={}, claimId={}, "
+                            + "but this Resource Claim cannot be found! Will create a temporary Resource Claim, but this may affect the framework's "
+                            + "ability to properly clean up this resource", container, section, claimId);
+                        resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, true);
+                    }
+
+                    final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
+                    claim.setLength(resourceLength);
+
+                    ffBuilder.contentClaim(claim);
+                    ffBuilder.contentClaimOffset(claimOffset);
+                }
+
+                boolean attributesChanged = true;
+                if (serializationVersion < 3) {
+                    attributesChanged = in.readBoolean();
+                }
+
+                if (attributesChanged) {
+                    final int numAttributes = in.readInt();
+                    for (int j = 0; j < numAttributes; j++) {
+                        final String key = readString(in);
+                        final String value = readString(in);
+
+                        ffBuilder.addAttribute(key, value);
+                    }
+                }
+
+                final FlowFileRecord record = ffBuilder.build();
+                if (resourceClaim != null) {
+                    resourceClaims.add(resourceClaim);
+                }
+
+                flowFiles.add(record);
+            } catch (final EOFException eof) {
+                final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
+                final SwapContents partialContents = new StandardSwapContents(swapSummary, flowFiles);
+                throw new IncompleteSwapFileException(location, partialContents);
+            }
+        }
+
+        final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
+        return new StandardSwapContents(swapSummary, flowFiles);
+    }
+
+    private static String readString(final InputStream in) throws IOException {
+        final Integer numBytes = readFieldLength(in);
+        if (numBytes == null) {
+            throw new EOFException();
+        }
+        final byte[] bytes = new byte[numBytes];
+        fillBuffer(in, bytes, numBytes);
+        return new String(bytes, StandardCharsets.UTF_8);
+    }
+
+    private static Integer readFieldLength(final InputStream in) throws IOException {
+        final int firstValue = in.read();
+        final int secondValue = in.read();
+        if (firstValue < 0) {
+            return null;
+        }
+        if (secondValue < 0) {
+            throw new EOFException();
+        }
+        if (firstValue == 0xff && secondValue == 0xff) {
+            final int ch1 = in.read();
+            final int ch2 = in.read();
+            final int ch3 = in.read();
+            final int ch4 = in.read();
+            if ((ch1 | ch2 | ch3 | ch4) < 0) {
+                throw new EOFException();
+            }
+            return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
+        } else {
+            return (firstValue << 8) + secondValue;
+        }
+    }
+
+    private static void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
+        int bytesRead;
+        int totalBytesRead = 0;
+        while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
+            totalBytesRead += bytesRead;
+        }
+        if (totalBytesRead != length) {
+            throw new EOFException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapSerializer.java
new file mode 100644
index 0000000..ea8b99b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapSerializer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @deprecated in favor of using {@link SchemaSwapSerializer}.
+ */
+@Deprecated
+public class SimpleSwapSerializer implements SwapSerializer {
+    private static final Logger logger = LoggerFactory.getLogger(SimpleSwapSerializer.class);
+    public static final int SWAP_ENCODING_VERSION = 10;
+
+
+    @Override
+    public void serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
+        if (toSwap == null || toSwap.isEmpty()) {
+            return;
+        }
+
+        long contentSize = 0L;
+        for (final FlowFileRecord record : toSwap) {
+            contentSize += record.getSize();
+        }
+
+        // persist record to disk via the swap file
+        final DataOutputStream out = new DataOutputStream(destination);
+        try {
+            out.writeInt(SWAP_ENCODING_VERSION);
+            out.writeUTF(queue.getIdentifier());
+            out.writeInt(toSwap.size());
+            out.writeLong(contentSize);
+
+            // get the max record id and write that out so that we know it quickly for restoration
+            long maxRecordId = 0L;
+            for (final FlowFileRecord flowFile : toSwap) {
+                if (flowFile.getId() > maxRecordId) {
+                    maxRecordId = flowFile.getId();
+                }
+            }
+
+            out.writeLong(maxRecordId);
+
+            for (final FlowFileRecord flowFile : toSwap) {
+                out.writeLong(flowFile.getId());
+                out.writeLong(flowFile.getEntryDate());
+                out.writeLong(flowFile.getLineageStartDate());
+                out.writeLong(flowFile.getLineageStartIndex());
+                out.writeLong(flowFile.getLastQueueDate());
+                out.writeLong(flowFile.getQueueDateIndex());
+                out.writeLong(flowFile.getSize());
+
+                final ContentClaim claim = flowFile.getContentClaim();
+                if (claim == null) {
+                    out.writeBoolean(false);
+                } else {
+                    out.writeBoolean(true);
+                    final ResourceClaim resourceClaim = claim.getResourceClaim();
+                    out.writeUTF(resourceClaim.getId());
+                    out.writeUTF(resourceClaim.getContainer());
+                    out.writeUTF(resourceClaim.getSection());
+                    out.writeLong(claim.getOffset());
+                    out.writeLong(claim.getLength());
+                    out.writeLong(flowFile.getContentClaimOffset());
+                    out.writeBoolean(resourceClaim.isLossTolerant());
+                }
+
+                final Map<String, String> attributes = flowFile.getAttributes();
+                out.writeInt(attributes.size());
+                for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+                    writeString(entry.getKey(), out);
+                    writeString(entry.getValue(), out);
+                }
+            }
+        } finally {
+            out.flush();
+        }
+
+        logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", toSwap.size(), queue, swapLocation);
+    }
+
+    private void writeString(final String toWrite, final OutputStream out) throws IOException {
+        final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
+        final int utflen = bytes.length;
+
+        if (utflen < 65535) {
+            out.write(utflen >>> 8);
+            out.write(utflen);
+            out.write(bytes);
+        } else {
+            out.write(255);
+            out.write(255);
+            out.write(utflen >>> 24);
+            out.write(utflen >>> 16);
+            out.write(utflen >>> 8);
+            out.write(utflen);
+            out.write(bytes);
+        }
+    }
+
+    @Override
+    public String getSerializationName() {
+        return "Simple Swap Serializer";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapDeserializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapDeserializer.java
new file mode 100644
index 0000000..a3fb30a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapDeserializer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+
+public interface SwapDeserializer {
+
+    SwapContents deserializeFlowFiles(DataInputStream in, String swapLocation, FlowFileQueue queue, ResourceClaimManager claimManager) throws IOException;
+
+    SwapSummary getSwapSummary(DataInputStream in, String swapLocation, ResourceClaimManager claimManager) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java
new file mode 100644
index 0000000..70fb539
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.repository.schema.ContentClaimSchema;
+import org.apache.nifi.controller.repository.schema.FlowFileSchema;
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.MapRecordField;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class SwapSchema {
+
+    public static final RecordSchema SWAP_SUMMARY_SCHEMA_V1;
+    public static final RecordSchema SWAP_CONTENTS_SCHEMA_V1;
+    public static final RecordSchema FULL_SWAP_FILE_SCHEMA_V1;
+
+    public static final String RESOURCE_CLAIMS = "Resource Claims";
+    public static final String RESOURCE_CLAIM = "Resource Claim";
+    public static final String RESOURCE_CLAIM_COUNT = "Claim Count";
+
+    public static final String QUEUE_IDENTIFIER = "Queue Identifier";
+    public static final String FLOWFILE_COUNT = "FlowFile Count";
+    public static final String FLOWFILE_SIZE = "FlowFile Size";
+    public static final String MAX_RECORD_ID = "Max Record ID";
+    public static final String SWAP_SUMMARY = "Swap Summary";
+    public static final String FLOWFILE_CONTENTS = "FlowFiles";
+
+
+    static {
+        final RecordField queueIdentifier = new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE);
+        final RecordField flowFileCount = new SimpleRecordField(FLOWFILE_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
+        final RecordField flowFileSize = new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE);
+        final RecordField maxRecordId = new SimpleRecordField(MAX_RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE);
+
+        final RecordField resourceClaimField = new ComplexRecordField(RESOURCE_CLAIM, Repetition.EXACTLY_ONE, ContentClaimSchema.RESOURCE_CLAIM_SCHEMA_V1.getFields());
+        final RecordField claimCountField = new SimpleRecordField(RESOURCE_CLAIM_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
+        final RecordField resourceClaims = new MapRecordField(RESOURCE_CLAIMS, resourceClaimField, claimCountField, Repetition.EXACTLY_ONE);
+
+        final List<RecordField> summaryFields = new ArrayList<>();
+        summaryFields.add(queueIdentifier);
+        summaryFields.add(flowFileCount);
+        summaryFields.add(flowFileSize);
+        summaryFields.add(maxRecordId);
+        summaryFields.add(resourceClaims);
+        SWAP_SUMMARY_SCHEMA_V1 = new RecordSchema(summaryFields);
+
+        final RecordField flowFiles = new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields());
+        final List<RecordField> contentsFields = Collections.singletonList(flowFiles);
+        SWAP_CONTENTS_SCHEMA_V1 = new RecordSchema(contentsFields);
+
+        final List<RecordField> fullSchemaFields = new ArrayList<>();
+        fullSchemaFields.add(new ComplexRecordField(SWAP_SUMMARY, Repetition.EXACTLY_ONE, summaryFields));
+        fullSchemaFields.add(new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields()));
+        FULL_SWAP_FILE_SCHEMA_V1 = new RecordSchema(fullSchemaFields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSerializer.java
new file mode 100644
index 0000000..e8439e6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSerializer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+public interface SwapSerializer {
+
+    void serializeFlowFiles(List<FlowFileRecord> toSwap, FlowFileQueue queue, String swapLocation, OutputStream destination) throws IOException;
+
+    String getSerializationName();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSummaryFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSummaryFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSummaryFieldMap.java
new file mode 100644
index 0000000..ab58ed6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSummaryFieldMap.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.schema.ResourceClaimFieldMap;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class SwapSummaryFieldMap implements Record {
+    private final SwapSummary swapSummary;
+    private final RecordSchema schema;
+    private final String queueIdentifier;
+    private final Map<ResourceClaimFieldMap, Integer> claimCounts;
+
+    public SwapSummaryFieldMap(final SwapSummary summary, final String queueIdentifier, final RecordSchema schema) {
+        this.swapSummary = summary;
+        this.queueIdentifier = queueIdentifier;
+        this.schema = schema;
+
+        final RecordField resourceClaimField = schema.getField(SwapSchema.RESOURCE_CLAIMS).getSubFields().get(0);
+        final RecordSchema resourceClaimSchema = new RecordSchema(resourceClaimField.getSubFields());
+
+        final List<ResourceClaim> resourceClaims = summary.getResourceClaims();
+        claimCounts = new HashMap<>();
+        for (final ResourceClaim claim : resourceClaims) {
+            final ResourceClaimFieldMap fieldMap = new ResourceClaimFieldMap(claim, resourceClaimSchema);
+
+            final Integer count = claimCounts.get(fieldMap);
+            if (count == null) {
+                claimCounts.put(fieldMap, 1);
+            } else {
+                claimCounts.put(fieldMap, count + 1);
+            }
+        }
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        switch (fieldName) {
+            case SwapSchema.MAX_RECORD_ID:
+                return swapSummary.getMaxFlowFileId();
+            case SwapSchema.FLOWFILE_COUNT:
+                return swapSummary.getQueueSize().getObjectCount();
+            case SwapSchema.FLOWFILE_SIZE:
+                return swapSummary.getQueueSize().getByteCount();
+            case SwapSchema.QUEUE_IDENTIFIER:
+                return queueIdentifier;
+            case SwapSchema.RESOURCE_CLAIMS:
+                return claimCounts;
+        }
+
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static SwapSummary getSwapSummary(final Record record, final ResourceClaimManager claimManager) {
+        final int flowFileCount = (Integer) record.getFieldValue(SwapSchema.FLOWFILE_COUNT);
+        final long flowFileSize = (Long) record.getFieldValue(SwapSchema.FLOWFILE_SIZE);
+        final QueueSize queueSize = new QueueSize(flowFileCount, flowFileSize);
+
+        final long maxFlowFileId = (Long) record.getFieldValue(SwapSchema.MAX_RECORD_ID);
+
+        final Map<Record, Integer> resourceClaimRecords = (Map<Record, Integer>) record.getFieldValue(SwapSchema.RESOURCE_CLAIMS);
+        final List<ResourceClaim> resourceClaims = new ArrayList<>();
+        for (final Map.Entry<Record, Integer> entry : resourceClaimRecords.entrySet()) {
+            final Record resourceClaimRecord = entry.getKey();
+            final ResourceClaim claim = ResourceClaimFieldMap.getResourceClaim(resourceClaimRecord, claimManager);
+
+            for (int i = 0; i < entry.getValue(); i++) {
+                resourceClaims.add(claim);
+            }
+        }
+
+        return new StandardSwapSummary(queueSize, maxFlowFileId, resourceClaims);
+    }
+}