You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/18 19:54:04 UTC
[4/7] nifi git commit: NIFI-2854: Refactor repositories and swap
files to use schema-based serialization so that nifi can be rolled back to a
previous version after an upgrade.
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
new file mode 100644
index 0000000..76c208d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.List;
+
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class ContentClaimFieldMap implements Record {
+ private final ContentClaim contentClaim;
+ private final long contentClaimOffset;
+ private final ResourceClaimFieldMap resourceClaimFieldMap;
+ private final RecordSchema schema;
+
+ public ContentClaimFieldMap(final ContentClaim contentClaim, final long contentClaimOffset, final RecordSchema schema) {
+ this.contentClaim = contentClaim;
+ this.contentClaimOffset = contentClaimOffset;
+ this.schema = schema;
+
+ final List<RecordField> resourceClaimFields = schema.getField(ContentClaimSchema.RESOURCE_CLAIM).getSubFields();
+ final RecordSchema resourceClaimSchema = new RecordSchema(resourceClaimFields);
+ this.resourceClaimFieldMap = new ResourceClaimFieldMap(contentClaim.getResourceClaim(), resourceClaimSchema);
+ }
+
+ @Override
+ public Object getFieldValue(final String fieldName) {
+ switch (fieldName) {
+ case ContentClaimSchema.RESOURCE_CLAIM:
+ return resourceClaimFieldMap;
+ case ContentClaimSchema.CONTENT_CLAIM_LENGTH:
+ return contentClaim.getLength();
+ case ContentClaimSchema.CONTENT_CLAIM_OFFSET:
+ return contentClaimOffset;
+ case ContentClaimSchema.RESOURCE_CLAIM_OFFSET:
+ return contentClaim.getOffset();
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public String toString() {
+ return "ContentClaimFieldMap[" + contentClaim + "]";
+ }
+
+ public static ContentClaim getContentClaim(final Record claimRecord, final ResourceClaimManager resourceClaimManager) {
+ final Record resourceClaimRecord = (Record) claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM);
+ final String container = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER);
+ final String section = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_SECTION);
+ final String identifier = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER);
+ final Boolean lossTolerant = (Boolean) resourceClaimRecord.getFieldValue(ContentClaimSchema.LOSS_TOLERANT);
+
+ final Long length = (Long) claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_LENGTH);
+ final Long resourceOffset = (Long) claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM_OFFSET);
+
+ final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, lossTolerant, false);
+ final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
+ contentClaim.setLength(length);
+
+ return contentClaim;
+ }
+
+ public static Long getContentClaimOffset(final Record claimRecord) {
+ return (Long) claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_OFFSET);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java
new file mode 100644
index 0000000..c55c758
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class ContentClaimSchema {
+
+ // resource claim fields
+ public static final String CLAIM_CONTAINER = "Container";
+ public static final String CLAIM_SECTION = "Section";
+ public static final String CLAIM_IDENTIFIER = "Identifier";
+ public static final String LOSS_TOLERANT = "Loss Tolerant";
+ public static final String RESOURCE_CLAIM = "Resource Claim";
+
+ // content claim fields
+ public static final String RESOURCE_CLAIM_OFFSET = "Resource Claim Offset"; // offset into resource claim where the content claim begins
+ public static final String CONTENT_CLAIM_OFFSET = "Content Claim Offset"; // offset into the content claim where the flowfile begins
+ public static final String CONTENT_CLAIM_LENGTH = "Content Claim Length";
+
+ public static final RecordSchema CONTENT_CLAIM_SCHEMA_V1;
+ public static final RecordSchema RESOURCE_CLAIM_SCHEMA_V1;
+
+ static {
+ final List<RecordField> resourceClaimFields = new ArrayList<>();
+ resourceClaimFields.add(new SimpleRecordField(CLAIM_CONTAINER, FieldType.STRING, Repetition.EXACTLY_ONE));
+ resourceClaimFields.add(new SimpleRecordField(CLAIM_SECTION, FieldType.STRING, Repetition.EXACTLY_ONE));
+ resourceClaimFields.add(new SimpleRecordField(CLAIM_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
+ resourceClaimFields.add(new SimpleRecordField(LOSS_TOLERANT, FieldType.BOOLEAN, Repetition.EXACTLY_ONE));
+ RESOURCE_CLAIM_SCHEMA_V1 = new RecordSchema(Collections.unmodifiableList(resourceClaimFields));
+
+ final List<RecordField> contentClaimFields = new ArrayList<>();
+ contentClaimFields.add(new ComplexRecordField(RESOURCE_CLAIM, Repetition.EXACTLY_ONE, resourceClaimFields));
+ contentClaimFields.add(new SimpleRecordField(RESOURCE_CLAIM_OFFSET, FieldType.LONG, Repetition.EXACTLY_ONE));
+ contentClaimFields.add(new SimpleRecordField(CONTENT_CLAIM_OFFSET, FieldType.LONG, Repetition.EXACTLY_ONE));
+ contentClaimFields.add(new SimpleRecordField(CONTENT_CLAIM_LENGTH, FieldType.LONG, Repetition.EXACTLY_ONE));
+ CONTENT_CLAIM_SCHEMA_V1 = new RecordSchema(Collections.unmodifiableList(contentClaimFields));
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java
new file mode 100644
index 0000000..ff0615f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class FlowFileRecordFieldMap implements Record {
+ private final FlowFileRecord flowFile;
+ private final RecordSchema schema;
+ private final RecordSchema contentClaimSchema;
+ private final ContentClaimFieldMap contentClaim;
+
+ public FlowFileRecordFieldMap(final FlowFileRecord flowFile, final RecordSchema schema) {
+ this.flowFile = flowFile;
+ this.schema = schema;
+
+ final RecordField contentClaimField = schema.getField(FlowFileSchema.CONTENT_CLAIM);
+ contentClaimSchema = new RecordSchema(contentClaimField.getSubFields());
+ contentClaim = flowFile.getContentClaim() == null ? null : new ContentClaimFieldMap(flowFile.getContentClaim(), flowFile.getContentClaimOffset(), contentClaimSchema);
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public Object getFieldValue(final String fieldName) {
+ switch (fieldName) {
+ case FlowFileSchema.ATTRIBUTES:
+ return flowFile.getAttributes();
+ case FlowFileSchema.CONTENT_CLAIM:
+ return contentClaim;
+ case FlowFileSchema.ENTRY_DATE:
+ return flowFile.getEntryDate();
+ case FlowFileSchema.FLOWFILE_SIZE:
+ return flowFile.getSize();
+ case FlowFileSchema.LINEAGE_START_DATE:
+ return flowFile.getLineageStartDate();
+ case FlowFileSchema.LINEAGE_START_INDEX:
+ return flowFile.getLineageStartIndex();
+ case FlowFileSchema.QUEUE_DATE:
+ return flowFile.getLastQueueDate();
+ case FlowFileSchema.QUEUE_DATE_INDEX:
+ return flowFile.getQueueDateIndex();
+ case FlowFileSchema.RECORD_ID:
+ return flowFile.getId();
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static FlowFileRecord getFlowFile(final Record record, final ResourceClaimManager claimManager) {
+ final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder();
+ builder.id((Long) record.getFieldValue(FlowFileSchema.RECORD_ID));
+ builder.entryDate((Long) record.getFieldValue(FlowFileSchema.ENTRY_DATE));
+ builder.size((Long) record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE));
+ builder.addAttributes((Map<String, String>) record.getFieldValue(FlowFileSchema.ATTRIBUTES));
+ builder.lineageStart((Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE), (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX));
+ builder.lastQueued((Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE), (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX));
+
+ final Record contentClaimRecord = (Record) record.getFieldValue(FlowFileSchema.CONTENT_CLAIM);
+ if (contentClaimRecord != null) {
+ final ContentClaim claim = ContentClaimFieldMap.getContentClaim(contentClaimRecord, claimManager);
+ builder.contentClaim(claim);
+
+ final Long offset = ContentClaimFieldMap.getContentClaimOffset(contentClaimRecord);
+ if (offset != null) {
+ builder.contentClaimOffset(offset);
+ }
+ }
+
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
new file mode 100644
index 0000000..53eab70
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.MapRecordField;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class FlowFileSchema {
+
+ public static final String RECORD_ID = "Record ID";
+ public static final String ENTRY_DATE = "Entry Date";
+ public static final String LINEAGE_START_DATE = "Lineage Start Date";
+ public static final String LINEAGE_START_INDEX = "Lineage Start Index";
+ public static final String QUEUE_DATE = "Queued Date";
+ public static final String QUEUE_DATE_INDEX = "Queued Date Index";
+ public static final String FLOWFILE_SIZE = "FlowFile Size";
+ public static final String CONTENT_CLAIM = "Content Claim";
+ public static final String ATTRIBUTES = "Attributes";
+
+ // attribute fields
+ public static final String ATTRIBUTE_NAME = "Attribute Name";
+ public static final String ATTRIBUTE_VALUE = "Attribute Value";
+
+ public static final RecordSchema FLOWFILE_SCHEMA_V1;
+
+ static {
+ final List<RecordField> flowFileFields = new ArrayList<>();
+
+ final RecordField attributeNameField = new SimpleRecordField(ATTRIBUTE_NAME, FieldType.STRING, Repetition.EXACTLY_ONE);
+ final RecordField attributeValueField = new SimpleRecordField(ATTRIBUTE_VALUE, FieldType.STRING, Repetition.EXACTLY_ONE);
+
+ flowFileFields.add(new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE));
+ flowFileFields.add(new SimpleRecordField(ENTRY_DATE, FieldType.LONG, Repetition.EXACTLY_ONE));
+ flowFileFields.add(new SimpleRecordField(LINEAGE_START_DATE, FieldType.LONG, Repetition.EXACTLY_ONE));
+ flowFileFields.add(new SimpleRecordField(LINEAGE_START_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE));
+ flowFileFields.add(new SimpleRecordField(QUEUE_DATE, FieldType.LONG, Repetition.EXACTLY_ONE));
+ flowFileFields.add(new SimpleRecordField(QUEUE_DATE_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE));
+ flowFileFields.add(new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE));
+ flowFileFields.add(new ComplexRecordField(CONTENT_CLAIM, Repetition.ZERO_OR_ONE, ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1.getFields()));
+ flowFileFields.add(new MapRecordField(ATTRIBUTES, attributeNameField, attributeValueField, Repetition.ZERO_OR_ONE));
+
+ FLOWFILE_SCHEMA_V1 = new RecordSchema(flowFileFields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
new file mode 100644
index 0000000..9804dec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class RepositoryRecordFieldMap implements Record {
+ private final RepositoryRecord record;
+ private final FlowFileRecord flowFile;
+ private final RecordSchema schema;
+ private final RecordSchema contentClaimSchema;
+
+ public RepositoryRecordFieldMap(final RepositoryRecord record, final RecordSchema repoRecordSchema, final RecordSchema contentClaimSchema) {
+ this.schema = repoRecordSchema;
+ this.contentClaimSchema = contentClaimSchema;
+ this.record = record;
+ this.flowFile = record.getCurrent();
+ }
+
+ @Override
+ public Object getFieldValue(final String fieldName) {
+ switch (fieldName) {
+ case RepositoryRecordSchema.ACTION_TYPE:
+ return record.getType().name();
+ case RepositoryRecordSchema.RECORD_ID:
+ return record.getCurrent().getId();
+ case RepositoryRecordSchema.SWAP_LOCATION:
+ return record.getSwapLocation();
+ case FlowFileSchema.ATTRIBUTES:
+ return flowFile.getAttributes();
+ case FlowFileSchema.ENTRY_DATE:
+ return flowFile.getEntryDate();
+ case FlowFileSchema.FLOWFILE_SIZE:
+ return flowFile.getSize();
+ case FlowFileSchema.LINEAGE_START_DATE:
+ return flowFile.getLineageStartDate();
+ case FlowFileSchema.LINEAGE_START_INDEX:
+ return flowFile.getLineageStartIndex();
+ case FlowFileSchema.QUEUE_DATE:
+ return flowFile.getLastQueueDate();
+ case FlowFileSchema.QUEUE_DATE_INDEX:
+ return flowFile.getQueueDateIndex();
+ case FlowFileSchema.CONTENT_CLAIM:
+ final ContentClaimFieldMap contentClaimFieldMap = record.getCurrentClaim() == null ? null
+ : new ContentClaimFieldMap(record.getCurrentClaim(), record.getCurrentClaimOffset(), contentClaimSchema);
+ return contentClaimFieldMap;
+ case RepositoryRecordSchema.QUEUE_IDENTIFIER:
+ final FlowFileQueue queue = record.getDestination() == null ? record.getOriginalQueue() : record.getDestination();
+ return queue.getIdentifier();
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public String toString() {
+ return "RepositoryRecordFieldMap[" + record + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
new file mode 100644
index 0000000..5887c8a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+import org.apache.nifi.repository.schema.UnionRecordField;
+
+public class RepositoryRecordSchema {
+
+ public static final String REPOSITORY_RECORD_UPDATE_V1 = "Repository Record Update"; // top level field name
+
+ // repository record fields
+ public static final String ACTION_TYPE = "Action";
+ public static final String RECORD_ID = "Record ID";
+ public static final String QUEUE_IDENTIFIER = "Queue Identifier";
+ public static final String SWAP_LOCATION = "Swap Location";
+
+ // Update types
+ public static final String CREATE_OR_UPDATE_ACTION = "Create or Update";
+ public static final String DELETE_ACTION = "Delete";
+ public static final String SWAP_IN_ACTION = "Swap In";
+ public static final String SWAP_OUT_ACTION = "Swap Out";
+
+ public static final RecordSchema REPOSITORY_RECORD_SCHEMA_V1;
+ public static final RecordSchema CREATE_OR_UPDATE_SCHEMA_V1;
+ public static final RecordSchema DELETE_SCHEMA_V1;
+ public static final RecordSchema SWAP_IN_SCHEMA_V1;
+ public static final RecordSchema SWAP_OUT_SCHEMA_V1;
+
+ public static final RecordField ACTION_TYPE_FIELD = new SimpleRecordField(ACTION_TYPE, FieldType.STRING, Repetition.EXACTLY_ONE);
+ public static final RecordField RECORD_ID_FIELD = new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE);
+
+ static {
+ // Fields for "Create" or "Update" records
+ final List<RecordField> createOrUpdateFields = new ArrayList<>();
+ createOrUpdateFields.add(ACTION_TYPE_FIELD);
+ createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields());
+ createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
+ createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.ZERO_OR_ONE));
+ final ComplexRecordField createOrUpdate = new ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, createOrUpdateFields);
+ CREATE_OR_UPDATE_SCHEMA_V1 = new RecordSchema(createOrUpdateFields);
+
+ // Fields for "Delete" records
+ final List<RecordField> deleteFields = new ArrayList<>();
+ deleteFields.add(ACTION_TYPE_FIELD);
+ deleteFields.add(RECORD_ID_FIELD);
+ final ComplexRecordField delete = new ComplexRecordField(DELETE_ACTION, Repetition.EXACTLY_ONE, deleteFields);
+ DELETE_SCHEMA_V1 = new RecordSchema(deleteFields);
+
+ // Fields for "Swap Out" records
+ final List<RecordField> swapOutFields = new ArrayList<>();
+ swapOutFields.add(ACTION_TYPE_FIELD);
+ swapOutFields.add(RECORD_ID_FIELD);
+ swapOutFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
+ swapOutFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
+ final ComplexRecordField swapOut = new ComplexRecordField(SWAP_OUT_ACTION, Repetition.EXACTLY_ONE, swapOutFields);
+ SWAP_OUT_SCHEMA_V1 = new RecordSchema(swapOutFields);
+
+ // Fields for "Swap In" records
+ final List<RecordField> swapInFields = new ArrayList<>(createOrUpdateFields);
+ swapInFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
+ final ComplexRecordField swapIn = new ComplexRecordField(SWAP_IN_ACTION, Repetition.EXACTLY_ONE, swapInFields);
+ SWAP_IN_SCHEMA_V1 = new RecordSchema(swapInFields);
+
+ // Union Field that creates the top-level field type
+ final UnionRecordField repoUpdateField = new UnionRecordField(REPOSITORY_RECORD_UPDATE_V1, Repetition.EXACTLY_ONE, createOrUpdate, delete, swapOut, swapIn);
+ REPOSITORY_RECORD_SCHEMA_V1 = new RecordSchema(Collections.singletonList(repoUpdateField));
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
new file mode 100644
index 0000000..ad51f4d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import org.apache.nifi.repository.schema.NamedValue;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.wali.UpdateType;
+
+public class RepositoryRecordUpdate implements Record {
+ private final RecordSchema schema;
+ private final RepositoryRecordFieldMap fieldMap;
+
+ public RepositoryRecordUpdate(final RepositoryRecordFieldMap fieldMap, final RecordSchema schema) {
+ this.schema = schema;
+ this.fieldMap = fieldMap;
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public Object getFieldValue(final String fieldName) {
+ if (RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V1.equals(fieldName)) {
+ final String actionType = (String) fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE);
+ final UpdateType updateType = UpdateType.valueOf(actionType);
+
+ final String actionName;
+ switch (updateType) {
+ case CREATE:
+ case UPDATE:
+ actionName = RepositoryRecordSchema.CREATE_OR_UPDATE_ACTION;
+ break;
+ case DELETE:
+ actionName = RepositoryRecordSchema.DELETE_ACTION;
+ break;
+ case SWAP_IN:
+ actionName = RepositoryRecordSchema.SWAP_IN_ACTION;
+ break;
+ case SWAP_OUT:
+ actionName = RepositoryRecordSchema.SWAP_OUT_ACTION;
+ break;
+ default:
+ return null;
+ }
+
+ return new NamedValue(actionName, fieldMap);
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
new file mode 100644
index 0000000..afa19ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class ResourceClaimFieldMap implements Record {
+ private final ResourceClaim resourceClaim;
+ private final RecordSchema schema;
+
+ public ResourceClaimFieldMap(final ResourceClaim resourceClaim, final RecordSchema schema) {
+ this.resourceClaim = resourceClaim;
+ this.schema = schema;
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public Object getFieldValue(final String fieldName) {
+ switch (fieldName) {
+ case ContentClaimSchema.CLAIM_CONTAINER:
+ return resourceClaim.getContainer();
+ case ContentClaimSchema.CLAIM_SECTION:
+ return resourceClaim.getSection();
+ case ContentClaimSchema.CLAIM_IDENTIFIER:
+ return resourceClaim.getId();
+ case ContentClaimSchema.LOSS_TOLERANT:
+ return resourceClaim.isLossTolerant();
+ }
+
+ return null;
+ }
+
+ public static ResourceClaim getResourceClaim(final Record record, final ResourceClaimManager claimManager) {
+ final String container = (String) record.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER);
+ final String section = (String) record.getFieldValue(ContentClaimSchema.CLAIM_SECTION);
+ final String identifier = (String) record.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER);
+ final Boolean lossTolerant = (Boolean) record.getFieldValue(ContentClaimSchema.LOSS_TOLERANT);
+
+ return claimManager.newResourceClaim(container, section, identifier, lossTolerant, false);
+ }
+
+ @Override
+ public int hashCode() {
+ return 41 + 91 * resourceClaim.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj.getClass() != ResourceClaimFieldMap.class) {
+ return false;
+ }
+
+ final ResourceClaimFieldMap other = (ResourceClaimFieldMap) obj;
+ return resourceClaim.equals(other.resourceClaim);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java
new file mode 100644
index 0000000..88e1415
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.schema.FlowFileRecordFieldMap;
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SchemaRecordReader;
+
+public class SchemaSwapDeserializer implements SwapDeserializer {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public SwapContents deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
+ final RecordSchema schema = RecordSchema.readFrom(in);
+ final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema);
+
+ final Record parentRecord = reader.readRecord(in);
+ final List<Record> flowFileRecords = (List<Record>) parentRecord.getFieldValue(SwapSchema.FLOWFILE_CONTENTS);
+
+ final List<FlowFileRecord> flowFiles = new ArrayList<>(flowFileRecords.size());
+ for (final Record record : flowFileRecords) {
+ flowFiles.add(FlowFileRecordFieldMap.getFlowFile(record, claimManager));
+ }
+
+ final Record summaryRecord = (Record) parentRecord.getFieldValue(SwapSchema.SWAP_SUMMARY);
+ final SwapSummary swapSummary = SwapSummaryFieldMap.getSwapSummary(summaryRecord, claimManager);
+
+ return new StandardSwapContents(swapSummary, flowFiles);
+ }
+
+ @Override
+ public SwapSummary getSwapSummary(final DataInputStream in, final String swapLocation, final ResourceClaimManager claimManager) throws IOException {
+ final RecordSchema schema = RecordSchema.readFrom(in);
+ final List<RecordField> summaryFields = schema.getField(SwapSchema.SWAP_SUMMARY).getSubFields();
+ final RecordField summaryRecordField = new ComplexRecordField(SwapSchema.SWAP_SUMMARY, Repetition.EXACTLY_ONE, summaryFields);
+ final RecordSchema summarySchema = new RecordSchema(Collections.singletonList(summaryRecordField));
+
+ final Record summaryRecordParent = SchemaRecordReader.fromSchema(summarySchema).readRecord(in);
+ final Record summaryRecord = (Record) summaryRecordParent.getFieldValue(SwapSchema.SWAP_SUMMARY);
+ final SwapSummary swapSummary = SwapSummaryFieldMap.getSwapSummary(summaryRecord, claimManager);
+ return swapSummary;
+ }
+
+ public static String getSerializationName() {
+ return SchemaSwapSerializer.SERIALIZATION_NAME;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java
new file mode 100644
index 0000000..195f55a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapSerializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.schema.FlowFileRecordFieldMap;
+import org.apache.nifi.controller.repository.schema.FlowFileSchema;
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldMapRecord;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SchemaRecordWriter;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class SchemaSwapSerializer implements SwapSerializer {
+ static final String SERIALIZATION_NAME = "Schema Swap Serialization";
+
+ private final RecordSchema schema = SwapSchema.FULL_SWAP_FILE_SCHEMA_V1;
+ private final RecordSchema flowFileSchema = new RecordSchema(schema.getField(SwapSchema.FLOWFILE_CONTENTS).getSubFields());
+
+ @Override
+ public void serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream out) throws IOException {
+ schema.writeTo(out);
+
+ long contentSize = 0L;
+ long maxFlowFileId = -1L;
+ final List<ResourceClaim> resourceClaims = new ArrayList<>();
+ for (final FlowFileRecord flowFile : toSwap) {
+ contentSize += flowFile.getSize();
+ if (flowFile.getId() > maxFlowFileId) {
+ maxFlowFileId = flowFile.getId();
+ }
+
+ final ContentClaim contentClaim = flowFile.getContentClaim();
+ if (contentClaim != null) {
+ resourceClaims.add(contentClaim.getResourceClaim());
+ }
+ }
+
+ final QueueSize queueSize = new QueueSize(toSwap.size(), contentSize);
+ final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxFlowFileId, resourceClaims);
+ final Record summaryRecord = new SwapSummaryFieldMap(swapSummary, queue.getIdentifier(), SwapSchema.SWAP_SUMMARY_SCHEMA_V1);
+
+ final List<Record> flowFileRecords = toSwap.stream()
+ .map(flowFile -> new FlowFileRecordFieldMap(flowFile, flowFileSchema))
+ .collect(Collectors.toList());
+
+ // Create a simple record to hold the summary and the flowfile contents
+ final RecordField summaryField = new SimpleRecordField(SwapSchema.SWAP_SUMMARY, FieldType.COMPLEX, Repetition.EXACTLY_ONE);
+ final RecordField contentsField = new ComplexRecordField(SwapSchema.FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields());
+ final List<RecordField> fields = new ArrayList<>(2);
+ fields.add(summaryField);
+ fields.add(contentsField);
+
+ final Map<RecordField, Object> swapFileMap = new LinkedHashMap<>();
+ swapFileMap.put(summaryField, summaryRecord);
+ swapFileMap.put(contentsField, flowFileRecords);
+ final Record swapFileRecord = new FieldMapRecord(swapFileMap, new RecordSchema(fields));
+
+ final SchemaRecordWriter writer = new SchemaRecordWriter();
+ writer.writeRecord(swapFileRecord, out);
+ out.flush();
+ }
+
+ @Override
+ public String getSerializationName() {
+ return SERIALIZATION_NAME;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapDeserializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapDeserializer.java
new file mode 100644
index 0000000..b86d9a8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapDeserializer.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.IncompleteSwapFileException;
+import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleSwapDeserializer implements SwapDeserializer {
+ public static final int SWAP_ENCODING_VERSION = 10;
+ private static final Logger logger = LoggerFactory.getLogger(SimpleSwapDeserializer.class);
+
+ @Override
+ public SwapSummary getSwapSummary(final DataInputStream in, final String swapLocation, final ResourceClaimManager claimManager) throws IOException {
+ final int swapEncodingVersion = in.readInt();
+ if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+ final String errMsg = "Cannot swap FlowFiles in from " + swapLocation + " because the encoding version is "
+ + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
+
+ throw new IOException(errMsg);
+ }
+
+ final int numRecords;
+ final long contentSize;
+ Long maxRecordId = null;
+ try {
+ in.readUTF(); // ignore Connection ID
+ numRecords = in.readInt();
+ contentSize = in.readLong();
+
+ if (numRecords == 0) {
+ return StandardSwapSummary.EMPTY_SUMMARY;
+ }
+
+ if (swapEncodingVersion > 7) {
+ maxRecordId = in.readLong();
+ }
+ } catch (final EOFException eof) {
+ logger.warn("Found premature End-of-File when reading Swap File {}. EOF occurred before any FlowFiles were encountered", swapLocation);
+ return StandardSwapSummary.EMPTY_SUMMARY;
+ }
+
+ final QueueSize queueSize = new QueueSize(numRecords, contentSize);
+ final SwapContents swapContents = deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, claimManager, swapLocation);
+ return swapContents.getSummary();
+ }
+
+
+ @Override
+ public SwapContents deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
+ final int swapEncodingVersion = in.readInt();
+ if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+ throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
+ + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
+ }
+
+ final String connectionId = in.readUTF(); // Connection ID
+ if (!connectionId.equals(queue.getIdentifier())) {
+ throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation
+ + " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier());
+ }
+
+ int numRecords = 0;
+ long contentSize = 0L;
+ Long maxRecordId = null;
+ try {
+ numRecords = in.readInt();
+ contentSize = in.readLong(); // Content Size
+ if (swapEncodingVersion > 7) {
+ maxRecordId = in.readLong(); // Max Record ID
+ }
+ } catch (final EOFException eof) {
+ final QueueSize queueSize = new QueueSize(numRecords, contentSize);
+ final SwapSummary summary = new StandardSwapSummary(queueSize, maxRecordId, Collections.emptyList());
+ final SwapContents partialContents = new StandardSwapContents(summary, Collections.emptyList());
+ throw new IncompleteSwapFileException(swapLocation, partialContents);
+ }
+
+ final QueueSize queueSize = new QueueSize(numRecords, contentSize);
+ return deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, claimManager, swapLocation);
+ }
+
+ private static SwapContents deserializeFlowFiles(final DataInputStream in, final QueueSize queueSize, final Long maxRecordId,
+ final int serializationVersion, final ResourceClaimManager claimManager, final String location) throws IOException {
+ final List<FlowFileRecord> flowFiles = new ArrayList<>(queueSize.getObjectCount());
+ final List<ResourceClaim> resourceClaims = new ArrayList<>(queueSize.getObjectCount());
+ Long maxId = maxRecordId;
+
+ for (int i = 0; i < queueSize.getObjectCount(); i++) {
+ try {
+ // legacy encoding had an "action" because it used to be couple with FlowFile Repository code
+ if (serializationVersion < 3) {
+ final int action = in.read();
+ if (action != 1) {
+ throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type");
+ }
+ }
+
+ final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+ final long recordId = in.readLong();
+ if (maxId == null || recordId > maxId) {
+ maxId = recordId;
+ }
+
+ ffBuilder.id(recordId);
+ ffBuilder.entryDate(in.readLong());
+
+ if (serializationVersion > 1) {
+ // Lineage information was added in version 2
+ if (serializationVersion < 10) {
+ final int numLineageIdentifiers = in.readInt();
+ for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) {
+ in.readUTF(); //skip each identifier
+ }
+ }
+
+ // version 9 adds in a 'lineage start index'
+ final long lineageStartDate = in.readLong();
+ final long lineageStartIndex;
+ if (serializationVersion > 8) {
+ lineageStartIndex = in.readLong();
+ } else {
+ lineageStartIndex = 0L;
+ }
+
+ ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
+
+ if (serializationVersion > 5) {
+ // Version 9 adds in a 'queue date index'
+ final long lastQueueDate = in.readLong();
+ final long queueDateIndex;
+ if (serializationVersion > 8) {
+ queueDateIndex = in.readLong();
+ } else {
+ queueDateIndex = 0L;
+ }
+
+ ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
+ }
+ }
+
+ ffBuilder.size(in.readLong());
+
+ if (serializationVersion < 3) {
+ readString(in); // connection Id
+ }
+
+ final boolean hasClaim = in.readBoolean();
+ ResourceClaim resourceClaim = null;
+ if (hasClaim) {
+ final String claimId;
+ if (serializationVersion < 5) {
+ claimId = String.valueOf(in.readLong());
+ } else {
+ claimId = in.readUTF();
+ }
+
+ final String container = in.readUTF();
+ final String section = in.readUTF();
+
+ final long resourceOffset;
+ final long resourceLength;
+ if (serializationVersion < 6) {
+ resourceOffset = 0L;
+ resourceLength = -1L;
+ } else {
+ resourceOffset = in.readLong();
+ resourceLength = in.readLong();
+ }
+
+ final long claimOffset = in.readLong();
+
+ final boolean lossTolerant;
+ if (serializationVersion >= 4) {
+ lossTolerant = in.readBoolean();
+ } else {
+ lossTolerant = false;
+ }
+
+ resourceClaim = claimManager.getResourceClaim(container, section, claimId);
+ if (resourceClaim == null) {
+ logger.error("Swap file indicates that FlowFile was referencing Resource Claim at container={}, section={}, claimId={}, "
+ + "but this Resource Claim cannot be found! Will create a temporary Resource Claim, but this may affect the framework's "
+ + "ability to properly clean up this resource", container, section, claimId);
+ resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, true);
+ }
+
+ final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
+ claim.setLength(resourceLength);
+
+ ffBuilder.contentClaim(claim);
+ ffBuilder.contentClaimOffset(claimOffset);
+ }
+
+ boolean attributesChanged = true;
+ if (serializationVersion < 3) {
+ attributesChanged = in.readBoolean();
+ }
+
+ if (attributesChanged) {
+ final int numAttributes = in.readInt();
+ for (int j = 0; j < numAttributes; j++) {
+ final String key = readString(in);
+ final String value = readString(in);
+
+ ffBuilder.addAttribute(key, value);
+ }
+ }
+
+ final FlowFileRecord record = ffBuilder.build();
+ if (resourceClaim != null) {
+ resourceClaims.add(resourceClaim);
+ }
+
+ flowFiles.add(record);
+ } catch (final EOFException eof) {
+ final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
+ final SwapContents partialContents = new StandardSwapContents(swapSummary, flowFiles);
+ throw new IncompleteSwapFileException(location, partialContents);
+ }
+ }
+
+ final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
+ return new StandardSwapContents(swapSummary, flowFiles);
+ }
+
+ private static String readString(final InputStream in) throws IOException {
+ final Integer numBytes = readFieldLength(in);
+ if (numBytes == null) {
+ throw new EOFException();
+ }
+ final byte[] bytes = new byte[numBytes];
+ fillBuffer(in, bytes, numBytes);
+ return new String(bytes, StandardCharsets.UTF_8);
+ }
+
+ private static Integer readFieldLength(final InputStream in) throws IOException {
+ final int firstValue = in.read();
+ final int secondValue = in.read();
+ if (firstValue < 0) {
+ return null;
+ }
+ if (secondValue < 0) {
+ throw new EOFException();
+ }
+ if (firstValue == 0xff && secondValue == 0xff) {
+ final int ch1 = in.read();
+ final int ch2 = in.read();
+ final int ch3 = in.read();
+ final int ch4 = in.read();
+ if ((ch1 | ch2 | ch3 | ch4) < 0) {
+ throw new EOFException();
+ }
+ return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
+ } else {
+ return (firstValue << 8) + secondValue;
+ }
+ }
+
+ private static void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
+ int bytesRead;
+ int totalBytesRead = 0;
+ while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
+ totalBytesRead += bytesRead;
+ }
+ if (totalBytesRead != length) {
+ throw new EOFException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapSerializer.java
new file mode 100644
index 0000000..ea8b99b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SimpleSwapSerializer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @deprecated in favor of using {@link SchemaSwapSerializer}.
+ */
+@Deprecated
+public class SimpleSwapSerializer implements SwapSerializer {
+ private static final Logger logger = LoggerFactory.getLogger(SimpleSwapSerializer.class);
+ public static final int SWAP_ENCODING_VERSION = 10;
+
+
+ @Override
+ public void serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
+ if (toSwap == null || toSwap.isEmpty()) {
+ return;
+ }
+
+ long contentSize = 0L;
+ for (final FlowFileRecord record : toSwap) {
+ contentSize += record.getSize();
+ }
+
+ // persist record to disk via the swap file
+ final DataOutputStream out = new DataOutputStream(destination);
+ try {
+ out.writeInt(SWAP_ENCODING_VERSION);
+ out.writeUTF(queue.getIdentifier());
+ out.writeInt(toSwap.size());
+ out.writeLong(contentSize);
+
+ // get the max record id and write that out so that we know it quickly for restoration
+ long maxRecordId = 0L;
+ for (final FlowFileRecord flowFile : toSwap) {
+ if (flowFile.getId() > maxRecordId) {
+ maxRecordId = flowFile.getId();
+ }
+ }
+
+ out.writeLong(maxRecordId);
+
+ for (final FlowFileRecord flowFile : toSwap) {
+ out.writeLong(flowFile.getId());
+ out.writeLong(flowFile.getEntryDate());
+ out.writeLong(flowFile.getLineageStartDate());
+ out.writeLong(flowFile.getLineageStartIndex());
+ out.writeLong(flowFile.getLastQueueDate());
+ out.writeLong(flowFile.getQueueDateIndex());
+ out.writeLong(flowFile.getSize());
+
+ final ContentClaim claim = flowFile.getContentClaim();
+ if (claim == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ out.writeUTF(resourceClaim.getId());
+ out.writeUTF(resourceClaim.getContainer());
+ out.writeUTF(resourceClaim.getSection());
+ out.writeLong(claim.getOffset());
+ out.writeLong(claim.getLength());
+ out.writeLong(flowFile.getContentClaimOffset());
+ out.writeBoolean(resourceClaim.isLossTolerant());
+ }
+
+ final Map<String, String> attributes = flowFile.getAttributes();
+ out.writeInt(attributes.size());
+ for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+ writeString(entry.getKey(), out);
+ writeString(entry.getValue(), out);
+ }
+ }
+ } finally {
+ out.flush();
+ }
+
+ logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", toSwap.size(), queue, swapLocation);
+ }
+
+ private void writeString(final String toWrite, final OutputStream out) throws IOException {
+ final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
+ final int utflen = bytes.length;
+
+ if (utflen < 65535) {
+ out.write(utflen >>> 8);
+ out.write(utflen);
+ out.write(bytes);
+ } else {
+ out.write(255);
+ out.write(255);
+ out.write(utflen >>> 24);
+ out.write(utflen >>> 16);
+ out.write(utflen >>> 8);
+ out.write(utflen);
+ out.write(bytes);
+ }
+ }
+
+ @Override
+ public String getSerializationName() {
+ return "Simple Swap Serializer";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapDeserializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapDeserializer.java
new file mode 100644
index 0000000..a3fb30a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapDeserializer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+
+public interface SwapDeserializer {
+
+ SwapContents deserializeFlowFiles(DataInputStream in, String swapLocation, FlowFileQueue queue, ResourceClaimManager claimManager) throws IOException;
+
+ SwapSummary getSwapSummary(DataInputStream in, String swapLocation, ResourceClaimManager claimManager) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java
new file mode 100644
index 0000000..70fb539
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSchema.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.repository.schema.ContentClaimSchema;
+import org.apache.nifi.controller.repository.schema.FlowFileSchema;
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.MapRecordField;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class SwapSchema {
+
+ public static final RecordSchema SWAP_SUMMARY_SCHEMA_V1;
+ public static final RecordSchema SWAP_CONTENTS_SCHEMA_V1;
+ public static final RecordSchema FULL_SWAP_FILE_SCHEMA_V1;
+
+ public static final String RESOURCE_CLAIMS = "Resource Claims";
+ public static final String RESOURCE_CLAIM = "Resource Claim";
+ public static final String RESOURCE_CLAIM_COUNT = "Claim Count";
+
+ public static final String QUEUE_IDENTIFIER = "Queue Identifier";
+ public static final String FLOWFILE_COUNT = "FlowFile Count";
+ public static final String FLOWFILE_SIZE = "FlowFile Size";
+ public static final String MAX_RECORD_ID = "Max Record ID";
+ public static final String SWAP_SUMMARY = "Swap Summary";
+ public static final String FLOWFILE_CONTENTS = "FlowFiles";
+
+
+ static {
+ final RecordField queueIdentifier = new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE);
+ final RecordField flowFileCount = new SimpleRecordField(FLOWFILE_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
+ final RecordField flowFileSize = new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE);
+ final RecordField maxRecordId = new SimpleRecordField(MAX_RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE);
+
+ final RecordField resourceClaimField = new ComplexRecordField(RESOURCE_CLAIM, Repetition.EXACTLY_ONE, ContentClaimSchema.RESOURCE_CLAIM_SCHEMA_V1.getFields());
+ final RecordField claimCountField = new SimpleRecordField(RESOURCE_CLAIM_COUNT, FieldType.INT, Repetition.EXACTLY_ONE);
+ final RecordField resourceClaims = new MapRecordField(RESOURCE_CLAIMS, resourceClaimField, claimCountField, Repetition.EXACTLY_ONE);
+
+ final List<RecordField> summaryFields = new ArrayList<>();
+ summaryFields.add(queueIdentifier);
+ summaryFields.add(flowFileCount);
+ summaryFields.add(flowFileSize);
+ summaryFields.add(maxRecordId);
+ summaryFields.add(resourceClaims);
+ SWAP_SUMMARY_SCHEMA_V1 = new RecordSchema(summaryFields);
+
+ final RecordField flowFiles = new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields());
+ final List<RecordField> contentsFields = Collections.singletonList(flowFiles);
+ SWAP_CONTENTS_SCHEMA_V1 = new RecordSchema(contentsFields);
+
+ final List<RecordField> fullSchemaFields = new ArrayList<>();
+ fullSchemaFields.add(new ComplexRecordField(SWAP_SUMMARY, Repetition.EXACTLY_ONE, summaryFields));
+ fullSchemaFields.add(new ComplexRecordField(FLOWFILE_CONTENTS, Repetition.ZERO_OR_MORE, FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields()));
+ FULL_SWAP_FILE_SCHEMA_V1 = new RecordSchema(fullSchemaFields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSerializer.java
new file mode 100644
index 0000000..e8439e6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSerializer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+public interface SwapSerializer {
+
+ void serializeFlowFiles(List<FlowFileRecord> toSwap, FlowFileQueue queue, String swapLocation, OutputStream destination) throws IOException;
+
+ String getSerializationName();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSummaryFieldMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSummaryFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSummaryFieldMap.java
new file mode 100644
index 0000000..ab58ed6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SwapSummaryFieldMap.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.schema.ResourceClaimFieldMap;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class SwapSummaryFieldMap implements Record {
+ private final SwapSummary swapSummary;
+ private final RecordSchema schema;
+ private final String queueIdentifier;
+ private final Map<ResourceClaimFieldMap, Integer> claimCounts;
+
+ public SwapSummaryFieldMap(final SwapSummary summary, final String queueIdentifier, final RecordSchema schema) {
+ this.swapSummary = summary;
+ this.queueIdentifier = queueIdentifier;
+ this.schema = schema;
+
+ final RecordField resourceClaimField = schema.getField(SwapSchema.RESOURCE_CLAIMS).getSubFields().get(0);
+ final RecordSchema resourceClaimSchema = new RecordSchema(resourceClaimField.getSubFields());
+
+ final List<ResourceClaim> resourceClaims = summary.getResourceClaims();
+ claimCounts = new HashMap<>();
+ for (final ResourceClaim claim : resourceClaims) {
+ final ResourceClaimFieldMap fieldMap = new ResourceClaimFieldMap(claim, resourceClaimSchema);
+
+ final Integer count = claimCounts.get(fieldMap);
+ if (count == null) {
+ claimCounts.put(fieldMap, 1);
+ } else {
+ claimCounts.put(fieldMap, count + 1);
+ }
+ }
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public Object getFieldValue(final String fieldName) {
+ switch (fieldName) {
+ case SwapSchema.MAX_RECORD_ID:
+ return swapSummary.getMaxFlowFileId();
+ case SwapSchema.FLOWFILE_COUNT:
+ return swapSummary.getQueueSize().getObjectCount();
+ case SwapSchema.FLOWFILE_SIZE:
+ return swapSummary.getQueueSize().getByteCount();
+ case SwapSchema.QUEUE_IDENTIFIER:
+ return queueIdentifier;
+ case SwapSchema.RESOURCE_CLAIMS:
+ return claimCounts;
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static SwapSummary getSwapSummary(final Record record, final ResourceClaimManager claimManager) {
+ final int flowFileCount = (Integer) record.getFieldValue(SwapSchema.FLOWFILE_COUNT);
+ final long flowFileSize = (Long) record.getFieldValue(SwapSchema.FLOWFILE_SIZE);
+ final QueueSize queueSize = new QueueSize(flowFileCount, flowFileSize);
+
+ final long maxFlowFileId = (Long) record.getFieldValue(SwapSchema.MAX_RECORD_ID);
+
+ final Map<Record, Integer> resourceClaimRecords = (Map<Record, Integer>) record.getFieldValue(SwapSchema.RESOURCE_CLAIMS);
+ final List<ResourceClaim> resourceClaims = new ArrayList<>();
+ for (final Map.Entry<Record, Integer> entry : resourceClaimRecords.entrySet()) {
+ final Record resourceClaimRecord = entry.getKey();
+ final ResourceClaim claim = ResourceClaimFieldMap.getResourceClaim(resourceClaimRecord, claimManager);
+
+ for (int i = 0; i < entry.getValue(); i++) {
+ resourceClaims.add(claim);
+ }
+ }
+
+ return new StandardSwapSummary(queueSize, maxFlowFileId, resourceClaims);
+ }
+}