You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/20 05:09:43 UTC

[3/5] nifi git commit: NIFI-3273: Added nifi-toolkit-flowfile-repo that contains a simple Java class that is capable of recovering a FlowFile Repository manually in the case of an operating system crash that results in trailing 0's being dumped into the

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
new file mode 100644
index 0000000..afa19ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class ResourceClaimFieldMap implements Record {
+    private final ResourceClaim resourceClaim;
+    private final RecordSchema schema;
+
+    public ResourceClaimFieldMap(final ResourceClaim resourceClaim, final RecordSchema schema) {
+        this.resourceClaim = resourceClaim;
+        this.schema = schema;
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Object getFieldValue(final String fieldName) {
+        switch (fieldName) {
+            case ContentClaimSchema.CLAIM_CONTAINER:
+                return resourceClaim.getContainer();
+            case ContentClaimSchema.CLAIM_SECTION:
+                return resourceClaim.getSection();
+            case ContentClaimSchema.CLAIM_IDENTIFIER:
+                return resourceClaim.getId();
+            case ContentClaimSchema.LOSS_TOLERANT:
+                return resourceClaim.isLossTolerant();
+        }
+
+        return null;
+    }
+
+    public static ResourceClaim getResourceClaim(final Record record, final ResourceClaimManager claimManager) {
+        final String container = (String) record.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER);
+        final String section = (String) record.getFieldValue(ContentClaimSchema.CLAIM_SECTION);
+        final String identifier = (String) record.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER);
+        final Boolean lossTolerant = (Boolean) record.getFieldValue(ContentClaimSchema.LOSS_TOLERANT);
+
+        return claimManager.newResourceClaim(container, section, identifier, lossTolerant, false);
+    }
+
+    @Override
+    public int hashCode() {
+        return 41 + 91 * resourceClaim.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+
+        if (obj.getClass() != ResourceClaimFieldMap.class) {
+            return false;
+        }
+
+        final ResourceClaimFieldMap other = (ResourceClaimFieldMap) obj;
+        return resourceClaim.equals(other.resourceClaim);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
new file mode 100644
index 0000000..59b0e7b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.controller.repository.RepositoryRecordType.SWAP_IN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SchemaRepositoryRecordSerdeTest {
+    public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier";
+    private StandardResourceClaimManager resourceClaimManager;
+    private SchemaRepositoryRecordSerde schemaRepositoryRecordSerde;
+    private Map<String, FlowFileQueue> queueMap;
+    private FlowFileQueue flowFileQueue;
+    private ByteArrayOutputStream byteArrayOutputStream;
+    private DataOutputStream dataOutputStream;
+
+    @Before
+    public void setup() {
+        resourceClaimManager = new StandardResourceClaimManager();
+        schemaRepositoryRecordSerde = new SchemaRepositoryRecordSerde(resourceClaimManager);
+        queueMap = new HashMap<>();
+        schemaRepositoryRecordSerde.setQueueMap(queueMap);
+        flowFileQueue = createMockQueue(TEST_QUEUE_IDENTIFIER);
+        byteArrayOutputStream = new ByteArrayOutputStream();
+        dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+    }
+
+    @After
+    public void teardown() {
+        resourceClaimManager.purge();
+    }
+
+    @Test
+    public void testV1CreateCantHandleLongAttributeName() throws IOException {
+        RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put(stringBuilder.toString(), "testValue");
+        schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
+                RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV1CreateCantHandleLongAttributeValue() throws IOException {
+        RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put("testName", stringBuilder.toString());
+        schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
+                RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV2CreateCanHandleLongAttributeName() throws IOException {
+        schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put(stringBuilder.toString(), "testValue");
+        schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV2CreateCanHandleLongAttributeValue() throws IOException {
+        schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put("testName", stringBuilder.toString());
+        schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testRoundTripCreateV1ToV2() throws IOException {
+        RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("testName", "testValue");
+        schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream,
+                RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV1SwapInCantHandleLongAttributeName() throws IOException {
+        RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put(stringBuilder.toString(), "testValue");
+        StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
+        record.setSwapLocation("fake");
+        assertEquals(SWAP_IN, record.getType());
+        schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV1SwapInCantHandleLongAttributeValue() throws IOException {
+        RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put("testName", stringBuilder.toString());
+        StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
+        record.setSwapLocation("fake");
+        assertEquals(SWAP_IN, record.getType());
+        schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV2SwapInCanHandleLongAttributeName() throws IOException {
+        schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put(stringBuilder.toString(), "testValue");
+        StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
+        record.setSwapLocation("fake");
+        assertEquals(SWAP_IN, record.getType());
+        schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testV2SwapInCanHandleLongAttributeValue() throws IOException {
+        schemaRepositoryRecordSerde.writeHeader(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < 65536; i++) {
+            stringBuilder.append('a');
+        }
+        attributes.put("testName", stringBuilder.toString());
+        StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
+        record.setSwapLocation("fake");
+        assertEquals(SWAP_IN, record.getType());
+        schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+    }
+
+    @Test
+    public void testRoundTripSwapInV1ToV2() throws IOException {
+        RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream);
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("testName", "testValue");
+        StandardRepositoryRecord record = createCreateFlowFileRecord(attributes);
+        record.setSwapLocation("fake");
+        assertEquals(SWAP_IN, record.getType());
+        schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+        DataInputStream dataInputStream = createDataInputStream();
+        schemaRepositoryRecordSerde.readHeader(dataInputStream);
+        RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2);
+        assertEquals(attributes, repositoryRecord.getCurrent().getAttributes());
+        assertEquals(SWAP_IN, repositoryRecord.getType());
+    }
+
+    private DataInputStream createDataInputStream() throws IOException {
+        dataOutputStream.flush();
+        return new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+    }
+
+    private StandardRepositoryRecord createCreateFlowFileRecord(Map<String, String> attributes) {
+        StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue);
+        StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder();
+        flowFileRecordBuilder.addAttributes(attributes);
+        standardRepositoryRecord.setWorking(flowFileRecordBuilder.build());
+        return standardRepositoryRecord;
+    }
+
+    private FlowFileQueue createMockQueue(String identifier) {
+        FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
+        when(flowFileQueue.getIdentifier()).thenReturn(identifier);
+        queueMap.put(identifier, flowFileQueue);
+        return flowFileQueue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 54d777f..6395e6e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -57,6 +57,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-repository-models</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-properties</artifactId>
         </dependency>
         <dependency>
@@ -136,6 +140,10 @@
             <artifactId>nifi-write-ahead-log</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-flowfile-repo-serialization</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
deleted file mode 100644
index 44ed62d..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.repository;
-
-import java.util.Map;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.wali.SerDe;
-import org.wali.UpdateType;
-
-public abstract class RepositoryRecordSerde implements SerDe<RepositoryRecord> {
-    private Map<String, FlowFileQueue> flowFileQueueMap = null;
-
-    protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
-        this.flowFileQueueMap = queueMap;
-    }
-
-    protected Map<String, FlowFileQueue> getQueueMap() {
-        return flowFileQueueMap;
-    }
-
-    protected FlowFileQueue getFlowFileQueue(final String queueId) {
-        return flowFileQueueMap.get(queueId);
-    }
-
-    @Override
-    public Long getRecordIdentifier(final RepositoryRecord record) {
-        return record.getCurrent().getId();
-    }
-
-    @Override
-    public UpdateType getUpdateType(final RepositoryRecord record) {
-        switch (record.getType()) {
-            case CONTENTMISSING:
-            case DELETE:
-                return UpdateType.DELETE;
-            case CREATE:
-                return UpdateType.CREATE;
-            case UPDATE:
-                return UpdateType.UPDATE;
-            case SWAP_OUT:
-                return UpdateType.SWAP_OUT;
-            case SWAP_IN:
-                return UpdateType.SWAP_IN;
-        }
-        return null;
-    }
-
-    @Override
-    public String getLocation(final RepositoryRecord record) {
-        return record.getSwapLocation();
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
deleted file mode 100644
index c19fa94..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.repository;
-
-import java.util.Map;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.wali.SerDe;
-import org.wali.SerDeFactory;
-import org.wali.UpdateType;
-
-public class RepositoryRecordSerdeFactory implements SerDeFactory<RepositoryRecord> {
-    private final String LEGACY_SERDE_ENCODING_NAME = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde";
-    private final ResourceClaimManager resourceClaimManager;
-    private Map<String, FlowFileQueue> flowFileQueueMap = null;
-
-    public RepositoryRecordSerdeFactory(final ResourceClaimManager claimManager) {
-        this.resourceClaimManager = claimManager;
-    }
-
-    protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
-        this.flowFileQueueMap = queueMap;
-    }
-
-    protected Map<String, FlowFileQueue> getQueueMap() {
-        return flowFileQueueMap;
-    }
-
-    @Override
-    public SerDe<RepositoryRecord> createSerDe(final String encodingName) {
-        if (encodingName == null || SchemaRepositoryRecordSerde.class.getName().equals(encodingName)) {
-            final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager);
-            serde.setQueueMap(flowFileQueueMap);
-            return serde;
-        }
-
-        if (WriteAheadRepositoryRecordSerde.class.getName().equals(encodingName)
-            || LEGACY_SERDE_ENCODING_NAME.equals(encodingName)) {
-            final WriteAheadRepositoryRecordSerde serde = new WriteAheadRepositoryRecordSerde(resourceClaimManager);
-            serde.setQueueMap(flowFileQueueMap);
-            return serde;
-        }
-
-        throw new IllegalArgumentException("Cannot create Deserializer for Repository Records because the encoding '" + encodingName + "' is not known");
-    }
-
-    protected FlowFileQueue getFlowFileQueue(final String queueId) {
-        return flowFileQueueMap.get(queueId);
-    }
-
-    @Override
-    public Long getRecordIdentifier(final RepositoryRecord record) {
-        return record.getCurrent().getId();
-    }
-
-    @Override
-    public UpdateType getUpdateType(final RepositoryRecord record) {
-        switch (record.getType()) {
-            case CONTENTMISSING:
-            case DELETE:
-                return UpdateType.DELETE;
-            case CREATE:
-                return UpdateType.CREATE;
-            case UPDATE:
-                return UpdateType.UPDATE;
-            case SWAP_OUT:
-                return UpdateType.SWAP_OUT;
-            case SWAP_IN:
-                return UpdateType.SWAP_IN;
-        }
-        return null;
-    }
-
-    @Override
-    public String getLocation(final RepositoryRecord record) {
-        return record.getSwapLocation();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
deleted file mode 100644
index 221f8ce..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.repository;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.schema.ContentClaimFieldMap;
-import org.apache.nifi.controller.repository.schema.ContentClaimSchema;
-import org.apache.nifi.controller.repository.schema.FlowFileSchema;
-import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap;
-import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
-import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate;
-import org.apache.nifi.repository.schema.FieldType;
-import org.apache.nifi.repository.schema.Record;
-import org.apache.nifi.repository.schema.RecordSchema;
-import org.apache.nifi.repository.schema.Repetition;
-import org.apache.nifi.repository.schema.SchemaRecordReader;
-import org.apache.nifi.repository.schema.SchemaRecordWriter;
-import org.apache.nifi.repository.schema.SimpleRecordField;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.SerDe;
-import org.wali.UpdateType;
-
-public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
-    private static final Logger logger = LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class);
-    private static final int MAX_ENCODING_VERSION = 2;
-
-    private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2;
-    private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1;
-
-    private final ResourceClaimManager resourceClaimManager;
-    private volatile RecordSchema recoverySchema;
-
-    public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) {
-        this.resourceClaimManager = resourceClaimManager;
-    }
-
-    @Override
-    public void writeHeader(final DataOutputStream out) throws IOException {
-        writeSchema.writeTo(out);
-    }
-
-    @Override
-    public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord newRecordState, final DataOutputStream out) throws IOException {
-        serializeRecord(newRecordState, out);
-    }
-
-    @Override
-    public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException {
-        final RecordSchema schema;
-        switch (record.getType()) {
-            case CREATE:
-            case UPDATE:
-                schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V2;
-                break;
-            case CONTENTMISSING:
-            case DELETE:
-                schema = RepositoryRecordSchema.DELETE_SCHEMA_V2;
-                break;
-            case SWAP_IN:
-                schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V2;
-                break;
-            case SWAP_OUT:
-                schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V2;
-                break;
-            default:
-                throw new IllegalArgumentException("Received Repository Record with unknown Update Type: " + record.getType()); // won't happen.
-        }
-
-        serializeRecord(record, out, schema, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2);
-    }
-
-
-    protected void serializeRecord(final RepositoryRecord record, final DataOutputStream out, RecordSchema schema, RecordSchema repositoryRecordSchema) throws IOException {
-        final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema);
-        final RepositoryRecordUpdate update = new RepositoryRecordUpdate(fieldMap, repositoryRecordSchema);
-        new SchemaRecordWriter().writeRecord(update, out);
-    }
-
-    @Override
-    public void readHeader(final DataInputStream in) throws IOException {
-        recoverySchema = RecordSchema.readFrom(in);
-    }
-
-    @Override
-    public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
-        return deserializeRecord(in, version);
-    }
-
-    @Override
-    public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
-        final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema);
-        final Record updateRecord = reader.readRecord(in);
-        if (updateRecord == null) {
-            // null may be returned by reader.readRecord() if it encounters end-of-stream
-            return null;
-        }
-
-        // Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the
-        // top level that indicates which type of record we have.
-        final Record record = (Record) updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2);
-
-        final String actionType = (String) record.getFieldValue(RepositoryRecordSchema.ACTION_TYPE_FIELD);
-        final UpdateType updateType = UpdateType.valueOf(actionType);
-        switch (updateType) {
-            case CREATE:
-                return createRecord(record);
-            case DELETE:
-                return deleteRecord(record);
-            case SWAP_IN:
-                return swapInRecord(record);
-            case SWAP_OUT:
-                return swapOutRecord(record);
-            case UPDATE:
-                return updateRecord(record);
-            default:
-                throw new IOException("Found unrecognized Update Type '" + actionType + "'");
-        }
-    }
-
-
-    @SuppressWarnings("unchecked")
-    private StandardRepositoryRecord createRecord(final Record record) {
-        final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
-        ffBuilder.id((Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID));
-        ffBuilder.entryDate((Long) record.getFieldValue(FlowFileSchema.ENTRY_DATE));
-
-        final Long lastQueueDate = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE);
-        final Long queueDateIndex = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX);
-        ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
-
-        final Long lineageStartDate = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE);
-        final Long lineageStartIndex = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX);
-        ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
-        populateContentClaim(ffBuilder, record);
-        ffBuilder.size((Long) record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE));
-
-        ffBuilder.addAttributes((Map<String, String>) record.getFieldValue(FlowFileSchema.ATTRIBUTES));
-
-        final FlowFileRecord flowFileRecord = ffBuilder.build();
-
-        final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
-        final FlowFileQueue queue = getFlowFileQueue(queueId);
-
-        final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(queue, flowFileRecord);
-        requireFlowFileQueue(repoRecord, queueId);
-        return repoRecord;
-    }
-
-    private void requireFlowFileQueue(final StandardRepositoryRecord repoRecord, final String queueId) {
-        if (queueId == null || queueId.trim().isEmpty()) {
-            logger.warn("{} does not have a Queue associated with it; this record will be discarded", repoRecord.getCurrent());
-            repoRecord.markForAbort();
-        } else if (repoRecord.getOriginalQueue() == null) {
-            logger.warn("{} maps to unknown Queue {}; this record will be discarded", repoRecord.getCurrent(), queueId);
-            repoRecord.markForAbort();
-        }
-    }
-
-    private void populateContentClaim(final StandardFlowFileRecord.Builder ffBuilder, final Record record) {
-        final Object claimMap = record.getFieldValue(FlowFileSchema.CONTENT_CLAIM);
-        if (claimMap == null) {
-            return;
-        }
-
-        final Record claimRecord = (Record) claimMap;
-        final ContentClaim contentClaim = ContentClaimFieldMap.getContentClaim(claimRecord, resourceClaimManager);
-        final Long offset = ContentClaimFieldMap.getContentClaimOffset(claimRecord);
-
-        ffBuilder.contentClaim(contentClaim);
-        ffBuilder.contentClaimOffset(offset);
-    }
-
-    private RepositoryRecord updateRecord(final Record record) {
-        return createRecord(record);
-    }
-
-    private RepositoryRecord deleteRecord(final Record record) {
-        final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD);
-        final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
-        final FlowFileRecord flowFileRecord = ffBuilder.build();
-
-        final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
-        repoRecord.markForDelete();
-        return repoRecord;
-    }
-
-    private RepositoryRecord swapInRecord(final Record record) {
-        final StandardRepositoryRecord repoRecord = createRecord(record);
-        final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
-        repoRecord.setSwapLocation(swapLocation);
-
-        final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
-        requireFlowFileQueue(repoRecord, queueId);
-        return repoRecord;
-    }
-
-    private RepositoryRecord swapOutRecord(final Record record) {
-        final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD);
-        final String queueId = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
-        final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
-        final FlowFileQueue queue = getFlowFileQueue(queueId);
-
-        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-            .id(recordId)
-            .build();
-
-        return new StandardRepositoryRecord(queue, flowFileRecord, swapLocation);
-    }
-
-    @Override
-    public int getVersion() {
-        return MAX_ENCODING_VERSION;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
deleted file mode 100644
index a1d5173..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import org.apache.commons.lang3.builder.CompareToBuilder;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-
-/**
- * <p>
- * A flow file is a logical notion of an item in a flow with its associated attributes and identity which can be used as a reference for its actual content.
- * </p>
- *
- * <b>Immutable - Thread Safe</b>
- *
- */
-public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
-
-    private final long id;
-    private final long entryDate;
-    private final long lineageStartDate;
-    private final long lineageStartIndex;
-    private final long size;
-    private final long penaltyExpirationMs;
-    private final Map<String, String> attributes;
-    private final ContentClaim claim;
-    private final long claimOffset;
-    private final long lastQueueDate;
-    private final long queueDateIndex;
-
-    private StandardFlowFileRecord(final Builder builder) {
-        this.id = builder.bId;
-        this.attributes = builder.bAttributes == null ? Collections.emptyMap() : builder.bAttributes;
-        this.entryDate = builder.bEntryDate;
-        this.lineageStartDate = builder.bLineageStartDate;
-        this.lineageStartIndex = builder.bLineageStartIndex;
-        this.penaltyExpirationMs = builder.bPenaltyExpirationMs;
-        this.size = builder.bSize;
-        this.claim = builder.bClaim;
-        this.claimOffset = builder.bClaimOffset;
-        this.lastQueueDate = builder.bLastQueueDate;
-        this.queueDateIndex = builder.bQueueDateIndex;
-    }
-
-    @Override
-    public long getId() {
-        return id;
-    }
-
-    @Override
-    public long getEntryDate() {
-        return entryDate;
-    }
-
-    @Override
-    public long getLineageStartDate() {
-        return lineageStartDate;
-    }
-
-    @Override
-    public Long getLastQueueDate() {
-        return lastQueueDate;
-    }
-
-    @Override
-    public boolean isPenalized() {
-        return penaltyExpirationMs > 0 ? penaltyExpirationMs > System.currentTimeMillis() : false;
-    }
-
-    @Override
-    public String getAttribute(final String key) {
-        return attributes.get(key);
-    }
-
-    @Override
-    public long getSize() {
-        return size;
-    }
-
-    @Override
-    public Map<String, String> getAttributes() {
-        return Collections.unmodifiableMap(this.attributes);
-    }
-
-    @Override
-    public ContentClaim getContentClaim() {
-        return this.claim;
-    }
-
-    @Override
-    public long getContentClaimOffset() {
-        return this.claimOffset;
-    }
-
-    @Override
-    public long getLineageStartIndex() {
-        return lineageStartIndex;
-    }
-
-    @Override
-    public long getQueueDateIndex() {
-        return queueDateIndex;
-    }
-
-    /**
-     * Provides the natural ordering for FlowFile objects which is based on their identifier.
-     *
-     * @param other other
-     * @return standard compare contract
-     */
-    @Override
-    public int compareTo(final FlowFile other) {
-        return new CompareToBuilder().append(id, other.getId()).toComparison();
-    }
-
-    @Override
-    public boolean equals(final Object other) {
-        if (this == other) {
-            return true;
-        }
-        if (!(other instanceof FlowFile)) {
-            return false;
-        }
-        final FlowFile otherRecord = (FlowFile) other;
-        return new EqualsBuilder().append(id, otherRecord.getId()).isEquals();
-    }
-
-    @Override
-    public String toString() {
-        final ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
-        builder.append("uuid", getAttribute(CoreAttributes.UUID.key()));
-        builder.append("claim", claim == null ? "" : claim.toString());
-        builder.append("offset", claimOffset);
-        builder.append("name", getAttribute(CoreAttributes.FILENAME.key())).append("size", size);
-        return builder.toString();
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder(7, 13).append(id).toHashCode();
-    }
-
-    public static final class Builder {
-
-        private long bId;
-        private long bEntryDate = System.currentTimeMillis();
-        private long bLineageStartDate = bEntryDate;
-        private long bLineageStartIndex = 0L;
-        private final Set<String> bLineageIdentifiers = new HashSet<>();
-        private long bPenaltyExpirationMs = -1L;
-        private long bSize = 0L;
-        private ContentClaim bClaim = null;
-        private long bClaimOffset = 0L;
-        private long bLastQueueDate = System.currentTimeMillis();
-        private long bQueueDateIndex = 0L;
-        private Map<String, String> bAttributes;
-        private boolean bAttributesCopied = false;
-
-        public Builder id(final long id) {
-            bId = id;
-            return this;
-        }
-
-        public Builder entryDate(final long epochMs) {
-            bEntryDate = epochMs;
-            return this;
-        }
-
-        public Builder lineageStart(final long lineageStartDate, final long lineageStartIndex) {
-            bLineageStartDate = lineageStartDate;
-            bLineageStartIndex = lineageStartIndex;
-            return this;
-        }
-
-        public Builder penaltyExpirationTime(final long epochMilliseconds) {
-            bPenaltyExpirationMs = epochMilliseconds;
-            return this;
-        }
-
-        public Builder size(final long bytes) {
-            if (bytes >= 0) {
-                bSize = bytes;
-            }
-            return this;
-        }
-
-        private Map<String, String> initializeAttributes() {
-            if (bAttributes == null) {
-                bAttributes = new HashMap<>();
-                bAttributesCopied = true;
-            } else if (!bAttributesCopied) {
-                bAttributes = new HashMap<>(bAttributes);
-                bAttributesCopied = true;
-            }
-
-            return bAttributes;
-        }
-
-        public Builder addAttribute(final String key, final String value) {
-            if (key != null && value != null) {
-                initializeAttributes().put(FlowFile.KeyValidator.validateKey(key), value);
-            }
-            return this;
-        }
-
-        public Builder addAttributes(final Map<String, String> attributes) {
-            final Map<String, String> initializedAttributes = initializeAttributes();
-
-            if (null != attributes) {
-                for (final String key : attributes.keySet()) {
-                    FlowFile.KeyValidator.validateKey(key);
-                }
-                for (final Map.Entry<String, String> entry : attributes.entrySet()) {
-                    final String key = entry.getKey();
-                    final String value = entry.getValue();
-                    if (key != null && value != null) {
-                        initializedAttributes.put(key, value);
-                    }
-                }
-            }
-            return this;
-        }
-
-        public Builder removeAttributes(final String... keys) {
-            if (keys != null) {
-                for (final String key : keys) {
-                    if (CoreAttributes.UUID.key().equals(key)) {
-                        continue;
-                    }
-
-                    initializeAttributes().remove(key);
-                }
-            }
-            return this;
-        }
-
-        public Builder removeAttributes(final Set<String> keys) {
-            if (keys != null) {
-                for (final String key : keys) {
-                    if (CoreAttributes.UUID.key().equals(key)) {
-                        continue;
-                    }
-
-                    initializeAttributes().remove(key);
-                }
-            }
-            return this;
-        }
-
-        public Builder removeAttributes(final Pattern keyPattern) {
-            if (keyPattern != null) {
-                final Iterator<String> iterator = initializeAttributes().keySet().iterator();
-                while (iterator.hasNext()) {
-                    final String key = iterator.next();
-
-                    if (CoreAttributes.UUID.key().equals(key)) {
-                        continue;
-                    }
-
-                    if (keyPattern.matcher(key).matches()) {
-                        iterator.remove();
-                    }
-                }
-            }
-            return this;
-        }
-
-        public Builder contentClaim(final ContentClaim claim) {
-            this.bClaim = claim;
-            return this;
-        }
-
-        public Builder contentClaimOffset(final long offset) {
-            this.bClaimOffset = offset;
-            return this;
-        }
-
-        public Builder lastQueued(final long lastQueueDate, final long queueDateIndex) {
-            this.bLastQueueDate = lastQueueDate;
-            this.bQueueDateIndex = queueDateIndex;
-            return this;
-        }
-
-        public Builder fromFlowFile(final FlowFileRecord specFlowFile) {
-            if (specFlowFile == null) {
-                return this;
-            }
-            bId = specFlowFile.getId();
-            bEntryDate = specFlowFile.getEntryDate();
-            bLineageStartDate = specFlowFile.getLineageStartDate();
-            bLineageStartIndex = specFlowFile.getLineageStartIndex();
-            bLineageIdentifiers.clear();
-            bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis();
-            bSize = specFlowFile.getSize();
-            bAttributes = specFlowFile.getAttributes();
-            bAttributesCopied = false;
-            bClaim = specFlowFile.getContentClaim();
-            bClaimOffset = specFlowFile.getContentClaimOffset();
-            bLastQueueDate = specFlowFile.getLastQueueDate();
-            bQueueDateIndex = specFlowFile.getQueueDateIndex();
-
-            return this;
-        }
-
-        public FlowFileRecord build() {
-            return new StandardFlowFileRecord(this);
-        }
-    }
-
-    @Override
-    public long getPenaltyExpirationMillis() {
-        return penaltyExpirationMs;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
deleted file mode 100644
index 8aa1caf..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.processor.Relationship;
-
-public class StandardRepositoryRecord implements RepositoryRecord {
-
-    private RepositoryRecordType type = null;
-    private FlowFileRecord workingFlowFileRecord = null;
-    private Relationship transferRelationship = null;
-    private FlowFileQueue destination = null;
-    private final FlowFileRecord originalFlowFileRecord;
-    private final FlowFileQueue originalQueue;
-    private String swapLocation;
-    private final Map<String, String> updatedAttributes = new HashMap<>();
-    private final Map<String, String> originalAttributes;
-    private List<ContentClaim> transientClaims;
-
-    /**
-     * Creates a new record which has no original claim or flow file - it is entirely new
-     *
-     * @param originalQueue queue
-     */
-    public StandardRepositoryRecord(final FlowFileQueue originalQueue) {
-        this(originalQueue, null);
-        this.type = RepositoryRecordType.CREATE;
-    }
-
-    /**
-     * Creates a record based on given original items
-     *
-     * @param originalQueue queue
-     * @param originalFlowFileRecord record
-     */
-    public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord) {
-        this(originalQueue, originalFlowFileRecord, null);
-        this.type = RepositoryRecordType.UPDATE;
-    }
-
-    public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord, final String swapLocation) {
-        this.originalQueue = originalQueue;
-        this.originalFlowFileRecord = originalFlowFileRecord;
-        this.type = RepositoryRecordType.SWAP_OUT;
-        this.swapLocation = swapLocation;
-        this.originalAttributes = originalFlowFileRecord == null ? Collections.<String, String>emptyMap() : originalFlowFileRecord.getAttributes();
-    }
-
-    @Override
-    public FlowFileQueue getDestination() {
-        return destination;
-    }
-
-    public void setDestination(final FlowFileQueue destination) {
-        this.destination = destination;
-    }
-
-    @Override
-    public RepositoryRecordType getType() {
-        return type;
-    }
-
-    FlowFileRecord getOriginal() {
-        return originalFlowFileRecord;
-    }
-
-    @Override
-    public String getSwapLocation() {
-        return swapLocation;
-    }
-
-    public void setSwapLocation(final String swapLocation) {
-        this.swapLocation = swapLocation;
-        if (type != RepositoryRecordType.SWAP_OUT) {
-            type = RepositoryRecordType.SWAP_IN; // we are swapping in a new record
-        }
-    }
-
-    @Override
-    public ContentClaim getOriginalClaim() {
-        return (originalFlowFileRecord == null) ? null : originalFlowFileRecord.getContentClaim();
-    }
-
-    @Override
-    public FlowFileQueue getOriginalQueue() {
-        return originalQueue;
-    }
-
-    public void setWorking(final FlowFileRecord flowFile) {
-        workingFlowFileRecord = flowFile;
-    }
-
-    public void setWorking(final FlowFileRecord flowFile, final String attributeKey, final String attributeValue) {
-        workingFlowFileRecord = flowFile;
-
-        // If setting attribute to same value as original, don't add to updated attributes
-        final String currentValue = originalAttributes.get(attributeKey);
-        if (currentValue == null || !currentValue.equals(attributeValue)) {
-            updatedAttributes.put(attributeKey, attributeValue);
-        }
-    }
-
-    public void setWorking(final FlowFileRecord flowFile, final Map<String, String> updatedAttribs) {
-        workingFlowFileRecord = flowFile;
-
-        for (final Map.Entry<String, String> entry : updatedAttribs.entrySet()) {
-            final String currentValue = originalAttributes.get(entry.getKey());
-            if (currentValue == null || !currentValue.equals(entry.getValue())) {
-                updatedAttributes.put(entry.getKey(), entry.getValue());
-            }
-        }
-    }
-
-    @Override
-    public boolean isAttributesChanged() {
-        return !updatedAttributes.isEmpty();
-    }
-
-    public void markForAbort() {
-        type = RepositoryRecordType.CONTENTMISSING;
-    }
-
-    @Override
-    public boolean isMarkedForAbort() {
-        return RepositoryRecordType.CONTENTMISSING.equals(type);
-    }
-
-    public void markForDelete() {
-        type = RepositoryRecordType.DELETE;
-    }
-
-    public boolean isMarkedForDelete() {
-        return RepositoryRecordType.DELETE.equals(type);
-    }
-
-    public void setTransferRelationship(final Relationship relationship) {
-        transferRelationship = relationship;
-    }
-
-    public Relationship getTransferRelationship() {
-        return transferRelationship;
-    }
-
-    FlowFileRecord getWorking() {
-        return workingFlowFileRecord;
-    }
-
-    ContentClaim getWorkingClaim() {
-        return (workingFlowFileRecord == null) ? null : workingFlowFileRecord.getContentClaim();
-    }
-
-    @Override
-    public FlowFileRecord getCurrent() {
-        return (workingFlowFileRecord == null) ? originalFlowFileRecord : workingFlowFileRecord;
-    }
-
-    @Override
-    public ContentClaim getCurrentClaim() {
-        return (getCurrent() == null) ? null : getCurrent().getContentClaim();
-    }
-
-    @Override
-    public long getCurrentClaimOffset() {
-        return (getCurrent() == null) ? 0L : getCurrent().getContentClaimOffset();
-    }
-
-    boolean isWorking() {
-        return (workingFlowFileRecord != null);
-    }
-
-    Map<String, String> getOriginalAttributes() {
-        return originalAttributes;
-    }
-
-    Map<String, String> getUpdatedAttributes() {
-        return updatedAttributes;
-    }
-
-    @Override
-    public String toString() {
-        return "StandardRepositoryRecord[UpdateType=" + getType() + ",Record=" + getCurrent() + "]";
-    }
-
-    @Override
-    public List<ContentClaim> getTransientClaims() {
-        return transientClaims == null ? Collections.<ContentClaim> emptyList() : Collections.unmodifiableList(transientClaims);
-    }
-
-    void addTransientClaim(final ContentClaim claim) {
-        if (claim == null) {
-            return;
-        }
-
-        if (transientClaims == null) {
-            transientClaims = new ArrayList<>();
-        }
-        transientClaims.add(claim);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
deleted file mode 100644
index e8ce44e..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
+++ /dev/null
@@ -1,517 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller.repository;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.flowfile.FlowFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wali.SerDe;
-import org.wali.UpdateType;
-
-public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
-    private static final Logger logger = LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class);
-
-    private static final int CURRENT_ENCODING_VERSION = 9;
-
-    public static final byte ACTION_CREATE = 0;
-    public static final byte ACTION_UPDATE = 1;
-    public static final byte ACTION_DELETE = 2;
-    public static final byte ACTION_SWAPPED_OUT = 3;
-    public static final byte ACTION_SWAPPED_IN = 4;
-
-    private long recordsRestored = 0L;
-    private final ResourceClaimManager claimManager;
-
-    public WriteAheadRepositoryRecordSerde(final ResourceClaimManager claimManager) {
-        this.claimManager = claimManager;
-    }
-
-    @Override
-    public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out) throws IOException {
-        serializeEdit(previousRecordState, record, out, false);
-    }
-
-    public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) throws IOException {
-        if (record.isMarkedForAbort()) {
-            logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record);
-            out.write(ACTION_DELETE);
-            out.writeLong(getRecordIdentifier(record));
-            serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
-            return;
-        }
-
-        final UpdateType updateType = getUpdateType(record);
-
-        if (updateType.equals(UpdateType.DELETE)) {
-            out.write(ACTION_DELETE);
-            out.writeLong(getRecordIdentifier(record));
-            serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
-            return;
-        }
-
-        // If there's a Destination Connection, that's the one that we want to associated with this record.
-        // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection".
-        // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen,
-        // so we use the originalConnection instead
-        FlowFileQueue associatedQueue = record.getDestination();
-        if (associatedQueue == null) {
-            associatedQueue = record.getOriginalQueue();
-        }
-
-        if (updateType.equals(UpdateType.SWAP_OUT)) {
-            out.write(ACTION_SWAPPED_OUT);
-            out.writeLong(getRecordIdentifier(record));
-            out.writeUTF(associatedQueue.getIdentifier());
-            out.writeUTF(getLocation(record));
-            return;
-        }
-
-        final FlowFile flowFile = record.getCurrent();
-        final ContentClaim claim = record.getCurrentClaim();
-
-        switch (updateType) {
-            case UPDATE:
-                out.write(ACTION_UPDATE);
-                break;
-            case CREATE:
-                out.write(ACTION_CREATE);
-                break;
-            case SWAP_IN:
-                out.write(ACTION_SWAPPED_IN);
-                break;
-            default:
-                throw new AssertionError();
-        }
-
-        out.writeLong(getRecordIdentifier(record));
-        out.writeLong(flowFile.getEntryDate());
-        out.writeLong(flowFile.getLineageStartDate());
-        out.writeLong(flowFile.getLineageStartIndex());
-
-        final Long queueDate = flowFile.getLastQueueDate();
-        out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate);
-        out.writeLong(flowFile.getQueueDateIndex());
-        out.writeLong(flowFile.getSize());
-
-        if (associatedQueue == null) {
-            logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart",
-                new Object[] {this, record});
-            writeString("", out);
-        } else {
-            writeString(associatedQueue.getIdentifier(), out);
-        }
-
-        serializeContentClaim(claim, record.getCurrentClaimOffset(), out);
-
-        if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) {
-            out.write(1);   // indicate attributes changed
-            final Map<String, String> attributes = flowFile.getAttributes();
-            out.writeInt(attributes.size());
-            for (final Map.Entry<String, String> entry : attributes.entrySet()) {
-                writeString(entry.getKey(), out);
-                writeString(entry.getValue(), out);
-            }
-        } else {
-            out.write(0);   // indicate attributes did not change
-        }
-
-        if (updateType == UpdateType.SWAP_IN) {
-            out.writeUTF(record.getSwapLocation());
-        }
-    }
-
-    @Override
-    public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
-        final int action = in.read();
-        final long recordId = in.readLong();
-        if (action == ACTION_DELETE) {
-            final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
-
-            if (version > 4) {
-                deserializeClaim(in, version, ffBuilder);
-            }
-
-            final FlowFileRecord flowFileRecord = ffBuilder.build();
-            final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
-            record.markForDelete();
-
-            return record;
-        }
-
-        if (action == ACTION_SWAPPED_OUT) {
-            final String queueId = in.readUTF();
-            final String location = in.readUTF();
-            final FlowFileQueue queue = getFlowFileQueue(queueId);
-
-            final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
-                .id(recordId)
-                .build();
-
-            return new StandardRepositoryRecord(queue, flowFileRecord, location);
-        }
-
-        final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
-        final RepositoryRecord record = currentRecordStates.get(recordId);
-        ffBuilder.id(recordId);
-        if (record != null) {
-            ffBuilder.fromFlowFile(record.getCurrent());
-        }
-        ffBuilder.entryDate(in.readLong());
-
-        if (version > 1) {
-            // read the lineage identifiers and lineage start date, which were added in version 2.
-            if (version < 9) {
-                final int numLineageIds = in.readInt();
-                for (int i = 0; i < numLineageIds; i++) {
-                    in.readUTF(); //skip identifiers
-                }
-            }
-            final long lineageStartDate = in.readLong();
-            final long lineageStartIndex;
-            if (version > 7) {
-                lineageStartIndex = in.readLong();
-            } else {
-                lineageStartIndex = 0L;
-            }
-            ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
-            if (version > 5) {
-                final long lastQueueDate = in.readLong();
-                final long queueDateIndex;
-                if (version > 7) {
-                    queueDateIndex = in.readLong();
-                } else {
-                    queueDateIndex = 0L;
-                }
-
-                ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
-            }
-        }
-
-        ffBuilder.size(in.readLong());
-        final String connectionId = readString(in);
-
-        logger.debug("{} -> {}", new Object[] {recordId, connectionId});
-
-        deserializeClaim(in, version, ffBuilder);
-
-        // recover new attributes, if they changed
-        final int attributesChanged = in.read();
-        if (attributesChanged == -1) {
-            throw new EOFException();
-        } else if (attributesChanged == 1) {
-            final int numAttributes = in.readInt();
-            final Map<String, String> attributes = new HashMap<>();
-            for (int i = 0; i < numAttributes; i++) {
-                final String key = readString(in);
-                final String value = readString(in);
-                attributes.put(key, value);
-            }
-
-            ffBuilder.addAttributes(attributes);
-        } else if (attributesChanged != 0) {
-            throw new IOException("Attribute Change Qualifier not found in stream; found value: "
-                + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!");
-        }
-
-        final FlowFileRecord flowFile = ffBuilder.build();
-        String swapLocation = null;
-        if (action == ACTION_SWAPPED_IN) {
-            swapLocation = in.readUTF();
-        }
-
-        final FlowFileQueue queue = getFlowFileQueue(connectionId);
-        final StandardRepositoryRecord standardRepoRecord = new StandardRepositoryRecord(queue, flowFile);
-        if (swapLocation != null) {
-            standardRepoRecord.setSwapLocation(swapLocation);
-        }
-
-        if (connectionId.isEmpty()) {
-            logger.warn("{} does not have a Queue associated with it; this record will be discarded", flowFile);
-            standardRepoRecord.markForAbort();
-        } else if (queue == null) {
-            logger.warn("{} maps to unknown Queue {}; this record will be discarded", flowFile, connectionId);
-            standardRepoRecord.markForAbort();
-        }
-
-        recordsRestored++;
-        return standardRepoRecord;
-    }
-
-    @Override
-    public StandardRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
-        final int action = in.read();
-        if (action == -1) {
-            return null;
-        }
-
-        final long recordId = in.readLong();
-        if (action == ACTION_DELETE) {
-            final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
-
-            if (version > 4) {
-                deserializeClaim(in, version, ffBuilder);
-            }
-
-            final FlowFileRecord flowFileRecord = ffBuilder.build();
-            final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
-            record.markForDelete();
-            return record;
-        }
-
-        // if action was not delete, it must be create/swap in
-        final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
-        final long entryDate = in.readLong();
-
-        if (version > 1) {
-            // read the lineage identifiers and lineage start date, which were added in version 2.
-            if (version < 9) {
-                final int numLineageIds = in.readInt();
-                for (int i = 0; i < numLineageIds; i++) {
-                    in.readUTF(); //skip identifiers
-                }
-            }
-
-            final long lineageStartDate = in.readLong();
-            final long lineageStartIndex;
-            if (version > 7) {
-                lineageStartIndex = in.readLong();
-            } else {
-                lineageStartIndex = 0L;
-            }
-            ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
-            if (version > 5) {
-                final long lastQueueDate = in.readLong();
-                final long queueDateIndex;
-                if (version > 7) {
-                    queueDateIndex = in.readLong();
-                } else {
-                    queueDateIndex = 0L;
-                }
-
-                ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
-            }
-        }
-
-        final long size = in.readLong();
-        final String connectionId = readString(in);
-
-        logger.debug("{} -> {}", new Object[] {recordId, connectionId});
-
-        ffBuilder.id(recordId);
-        ffBuilder.entryDate(entryDate);
-        ffBuilder.size(size);
-
-        deserializeClaim(in, version, ffBuilder);
-
-        final int attributesChanged = in.read();
-        if (attributesChanged == 1) {
-            final int numAttributes = in.readInt();
-            final Map<String, String> attributes = new HashMap<>();
-            for (int i = 0; i < numAttributes; i++) {
-                final String key = readString(in);
-                final String value = readString(in);
-                attributes.put(key, value);
-            }
-
-            ffBuilder.addAttributes(attributes);
-        } else if (attributesChanged == -1) {
-            throw new EOFException();
-        } else if (attributesChanged != 0) {
-            throw new IOException("Attribute Change Qualifier not found in stream; found value: "
-                + attributesChanged + " after successfully restoring " + recordsRestored + " records");
-        }
-
-        final FlowFileRecord flowFile = ffBuilder.build();
-        String swapLocation = null;
-        if (action == ACTION_SWAPPED_IN) {
-            swapLocation = in.readUTF();
-        }
-
-        final StandardRepositoryRecord record;
-        final FlowFileQueue queue = getFlowFileQueue(connectionId);
-        record = new StandardRepositoryRecord(queue, flowFile);
-        if (swapLocation != null) {
-            record.setSwapLocation(swapLocation);
-        }
-
-        if (connectionId.isEmpty()) {
-            logger.warn("{} does not have a FlowFile Queue associated with it; this record will be discarded", flowFile);
-            record.markForAbort();
-        } else if (queue == null) {
-            logger.warn("{} maps to unknown FlowFile Queue {}; this record will be discarded", flowFile, connectionId);
-            record.markForAbort();
-        }
-
-        recordsRestored++;
-        return record;
-    }
-
-    @Override
-    public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException {
-        serializeEdit(null, record, out, true);
-    }
-
-    private void serializeContentClaim(final ContentClaim claim, final long offset, final DataOutputStream out) throws IOException {
-        if (claim == null) {
-            out.write(0);
-        } else {
-            out.write(1);
-
-            final ResourceClaim resourceClaim = claim.getResourceClaim();
-            writeString(resourceClaim.getId(), out);
-            writeString(resourceClaim.getContainer(), out);
-            writeString(resourceClaim.getSection(), out);
-            out.writeLong(claim.getOffset());
-            out.writeLong(claim.getLength());
-
-            out.writeLong(offset);
-            out.writeBoolean(resourceClaim.isLossTolerant());
-        }
-    }
-
-    private void deserializeClaim(final DataInputStream in, final int serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws IOException {
-        // determine current Content Claim.
-        final int claimExists = in.read();
-        if (claimExists == 1) {
-            final String claimId;
-            if (serializationVersion < 4) {
-                claimId = String.valueOf(in.readLong());
-            } else {
-                claimId = readString(in);
-            }
-
-            final String container = readString(in);
-            final String section = readString(in);
-
-            final long resourceOffset;
-            final long resourceLength;
-            if (serializationVersion < 7) {
-                resourceOffset = 0L;
-                resourceLength = -1L;
-            } else {
-                resourceOffset = in.readLong();
-                resourceLength = in.readLong();
-            }
-
-            final long claimOffset = in.readLong();
-
-            final boolean lossTolerant;
-            if (serializationVersion >= 3) {
-                lossTolerant = in.readBoolean();
-            } else {
-                lossTolerant = false;
-            }
-
-            final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false);
-            final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
-            contentClaim.setLength(resourceLength);
-
-            ffBuilder.contentClaim(contentClaim);
-            ffBuilder.contentClaimOffset(claimOffset);
-        } else if (claimExists == -1) {
-            throw new EOFException();
-        } else if (claimExists != 0) {
-            throw new IOException("Claim Existence Qualifier not found in stream; found value: "
-                + claimExists + " after successfully restoring " + recordsRestored + " records");
-        }
-    }
-
-    private void writeString(final String toWrite, final OutputStream out) throws IOException {
-        final byte[] bytes = toWrite.getBytes("UTF-8");
-        final int utflen = bytes.length;
-
-        if (utflen < 65535) {
-            out.write(utflen >>> 8);
-            out.write(utflen);
-            out.write(bytes);
-        } else {
-            out.write(255);
-            out.write(255);
-            out.write(utflen >>> 24);
-            out.write(utflen >>> 16);
-            out.write(utflen >>> 8);
-            out.write(utflen);
-            out.write(bytes);
-        }
-    }
-
-    private String readString(final InputStream in) throws IOException {
-        final Integer numBytes = readFieldLength(in);
-        if (numBytes == null) {
-            throw new EOFException();
-        }
-        final byte[] bytes = new byte[numBytes];
-        fillBuffer(in, bytes, numBytes);
-        return new String(bytes, "UTF-8");
-    }
-
-    private Integer readFieldLength(final InputStream in) throws IOException {
-        final int firstValue = in.read();
-        final int secondValue = in.read();
-        if (firstValue < 0) {
-            return null;
-        }
-        if (secondValue < 0) {
-            throw new EOFException();
-        }
-        if (firstValue == 0xff && secondValue == 0xff) {
-            final int ch1 = in.read();
-            final int ch2 = in.read();
-            final int ch3 = in.read();
-            final int ch4 = in.read();
-            if ((ch1 | ch2 | ch3 | ch4) < 0) {
-                throw new EOFException();
-            }
-            return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
-        } else {
-            return (firstValue << 8) + secondValue;
-        }
-    }
-
-    private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
-        int bytesRead;
-        int totalBytesRead = 0;
-        while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
-            totalBytesRead += bytesRead;
-        }
-        if (totalBytesRead != length) {
-            throw new EOFException();
-        }
-    }
-
-    @Override
-    public int getVersion() {
-        return CURRENT_ENCODING_VERSION;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
deleted file mode 100644
index 39a2591..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.claim;
-
-
-/**
- * <p>
- * A ContentClaim is a reference to a given flow file's content. Multiple flow files may reference the same content by both having the same content claim.</p>
- *
- * <p>
- * Must be thread safe</p>
- *
- */
-public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> {
-
-    private final ResourceClaim resourceClaim;
-    private final long offset;
-    private volatile long length;
-
-    public StandardContentClaim(final ResourceClaim resourceClaim, final long offset) {
-        this.resourceClaim = resourceClaim;
-        this.offset = offset;
-        this.length = -1L;
-    }
-
-    public void setLength(final long length) {
-        this.length = length;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result;
-        result = prime * result + (int) (offset ^ offset >>> 32);
-        result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode());
-        return result;
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (this == obj) {
-            return true;
-        }
-
-        if (obj == null) {
-            return false;
-        }
-
-        if (!(obj instanceof ContentClaim)) {
-            return false;
-        }
-
-        final ContentClaim other = (ContentClaim) obj;
-        if (offset != other.getOffset()) {
-            return false;
-        }
-
-        return resourceClaim.equals(other.getResourceClaim());
-    }
-
-    @Override
-    public int compareTo(final ContentClaim o) {
-        final int resourceComp = resourceClaim.compareTo(o.getResourceClaim());
-        if (resourceComp != 0) {
-            return resourceComp;
-        }
-
-        return Long.compare(offset, o.getOffset());
-    }
-
-    @Override
-    public ResourceClaim getResourceClaim() {
-        return resourceClaim;
-    }
-
-    @Override
-    public long getOffset() {
-        return offset;
-    }
-
-    @Override
-    public long getLength() {
-        return length;
-    }
-
-    @Override
-    public String toString() {
-        return "StandardContentClaim [resourceClaim=" + resourceClaim + ", offset=" + offset + ", length=" + length + "]";
-    }
-}