You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/02/18 22:26:39 UTC
nifi git commit: NIFI-1527: Ensure that we increment Claimant Counts
for content claims that are referenced by Swapped-Out FlowFiles on restart of
nifi
Repository: nifi
Updated Branches:
refs/heads/master 1c73d9090 -> 3bb18b965
NIFI-1527: Ensure that we increment Claimant Counts for content claims that are referenced by Swapped-Out FlowFiles on restart of nifi
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3bb18b96
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3bb18b96
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3bb18b96
Branch: refs/heads/master
Commit: 3bb18b965348b439cadc8f548429ab1347f9c305
Parents: 1c73d90
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Feb 18 11:14:35 2016 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Feb 18 16:25:57 2016 -0500
----------------------------------------------------------------------
.../nifi/controller/queue/FlowFileQueue.java | 13 +-
.../apache/nifi/controller/queue/QueueSize.java | 26 +-
.../repository/FlowFileSwapManager.java | 17 +-
.../nifi/controller/repository/SwapSummary.java | 50 ++++
.../nifi/controller/FileSystemSwapManager.java | 52 ++--
.../apache/nifi/controller/FlowController.java | 14 +-
.../nifi/controller/StandardFlowFileQueue.java | 13 +-
.../controller/swap/StandardSwapSummary.java | 54 ++++
.../controller/TestStandardFlowFileQueue.java | 31 ++-
.../TestWriteAheadFlowFileRepository.java | 247 +++++++++++++++++--
10 files changed, 418 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index f2066a8..7948ecb 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.FlowFileFilter;
@@ -40,16 +41,14 @@ public interface FlowFileQueue {
List<FlowFilePrioritizer> getPriorities();
/**
- * Reads any Swap Files that belong to this queue and increments counts so that the size
- * of the queue will reflect the size of all FlowFiles regardless of whether or not they are
- * swapped out. This will be called only during NiFi startup as an initialization step. This
- * method is then responsible for returning the largest ID of any FlowFile that is swapped
+ * Reads any Swap Files that belong to this queue and returns a summary of what is swapped out.
+ * This will be called only during NiFi startup as an initialization step. This
+ * method is then responsible for returning a FlowFileSummary of the FlowFiles that are swapped
* out, or <code>null</code> if no FlowFiles are swapped out for this queue.
*
- * @return the largest ID of any FlowFile that is swapped out for this queue, or <code>null</code> if
- * no FlowFiles are swapped out for this queue.
+ * @return a SwapSummary that describes the FlowFiles that exist in the queue but are swapped out.
*/
- Long recoverSwappedFlowFiles();
+ SwapSummary recoverSwappedFlowFiles();
/**
* Destroys any Swap Files that exist for this queue without updating the FlowFile Repository
http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
index a59db4c..35a860b 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
@@ -22,9 +22,9 @@ import java.text.NumberFormat;
*
*/
public class QueueSize {
-
private final int objectCount;
private final long totalSizeBytes;
+ private final int hashCode;
public QueueSize(final int numberObjects, final long totalSizeBytes) {
if (numberObjects < 0 || totalSizeBytes < 0) {
@@ -32,6 +32,7 @@ public class QueueSize {
}
objectCount = numberObjects;
this.totalSizeBytes = totalSizeBytes;
+ hashCode = (int) (41 + 47 * objectCount + 51 * totalSizeBytes);
}
/**
@@ -66,4 +67,27 @@ public class QueueSize {
public String toString() {
return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]";
}
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (!(obj instanceof QueueSize)) {
+ return false;
+ }
+
+ final QueueSize other = (QueueSize) obj;
+ return getObjectCount() == other.getObjectCount() && getByteCount() == other.getByteCount();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 3e341f8..e07cf1a 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.queue.QueueSize;
/**
* Defines a mechanism by which FlowFiles can be move into external storage or
@@ -90,20 +89,14 @@ public interface FlowFileSwapManager {
List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException;
/**
- * Determines how many FlowFiles and the size of the FlowFiles that are swapped out at the given location
+ * Parses the contents of the swap file at the given location and provides a SwapSummary that provides
+ * pertinent information about the information stored within the swap file
*
* @param swapLocation the location of the swap file
- * @return the QueueSize representing the number of FlowFiles and total size of the FlowFiles that are swapped out
+ * @return a SwapSummary that provides information about what is contained within the swap file
+ * @throws IOException if unable to read or parse the swap file
*/
- QueueSize getSwapSize(String swapLocation) throws IOException;
-
- /**
- * Returns the maximum record id of the FlowFiles stored at the given swap location
- *
- * @param swapLocation the swap location to read id's from
- * @return the max record id of any FlowFile in the swap location, or null if no record ID's can be found
- */
- Long getMaxRecordId(String swapLocation) throws IOException;
+ SwapSummary getSwapSummary(String swapLocation) throws IOException;
/**
* Purge all known Swap Files without updating FlowFileRepository or Provenance Repository
http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapSummary.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapSummary.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapSummary.java
new file mode 100644
index 0000000..ad01ccd
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapSummary.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.util.List;
+
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+
+/**
+ * <p>
+ * Provides a summary of the information that is stored in a FlowFile swap file.
+ * </p>
+ */
+public interface SwapSummary {
+ /**
+ * @return a QueueSize that represents the number of FlowFiles are in the swap file and their
+ * aggregate content size
+ */
+ QueueSize getQueueSize();
+
+ /**
+ * @return the largest ID of any of the FlowFiles that are contained in the swap file
+ */
+ Long getMaxFlowFileId();
+
+ /**
+ * Returns a List of all ResoruceClaims that are referenced by the FlowFiles in the swap file.
+ * This List may well contain the same ResourceClaim many times. This indicates that many FlowFiles
+ * reference the same ResourceClaim.
+ *
+ * @return a List of all ResourceClaims that are referenced by the FlowFiles in the swap file
+ */
+ List<ResourceClaim> getResourceClaims();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 76f7cd7..105dcff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -47,10 +47,12 @@ import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.BufferedOutputStream;
@@ -237,8 +239,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
return swapLocations;
}
+ @SuppressWarnings("deprecation")
@Override
- public QueueSize getSwapSize(final String swapLocation) throws IOException {
+ public SwapSummary getSwapSummary(final String swapLocation) throws IOException {
final File swapFile = new File(swapLocation);
// read record from disk via the swap file
@@ -259,56 +262,36 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final int numRecords = in.readInt();
final long contentSize = in.readLong();
- return new QueueSize(numRecords, contentSize);
- }
- }
-
- @Override
- public Long getMaxRecordId(final String swapLocation) throws IOException {
- final File swapFile = new File(swapLocation);
-
- // read record from disk via the swap file
- try (final InputStream fis = new FileInputStream(swapFile);
- final InputStream bufferedIn = new BufferedInputStream(fis);
- final DataInputStream in = new DataInputStream(bufferedIn)) {
-
- final int swapEncodingVersion = in.readInt();
- if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
- final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
- + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
-
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
- throw new IOException(errMsg);
- }
-
- in.readUTF(); // ignore connection id
- final int numRecords = in.readInt();
- in.readLong(); // ignore content size
-
if (numRecords == 0) {
- return null;
+ return StandardSwapSummary.EMPTY_SUMMARY;
}
+ Long maxRecordId = null;
if (swapEncodingVersion > 7) {
- final long maxRecordId = in.readLong();
- return maxRecordId;
+ maxRecordId = in.readLong();
}
// Before swap encoding version 8, we did not write out the max record id, so we have to read all
// swap files to determine the max record id
+ final List<ResourceClaim> resourceClaims = new ArrayList<>(numRecords);
final List<FlowFileRecord> records = deserializeFlowFiles(in, numRecords, swapEncodingVersion, true, claimManager);
- long maxId = 0L;
for (final FlowFileRecord record : records) {
- if (record.getId() > maxId) {
- maxId = record.getId();
+ if (maxRecordId == null || record.getId() > maxRecordId) {
+ maxRecordId = record.getId();
+ }
+
+ final ContentClaim contentClaim = record.getContentClaim();
+ if (contentClaim != null) {
+ resourceClaims.add(contentClaim.getResourceClaim());
}
}
- return maxId;
+ return new StandardSwapSummary(new QueueSize(numRecords, contentSize), maxRecordId, resourceClaims);
}
}
+ @SuppressWarnings("deprecation")
public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
if (toSwap == null || toSwap.isEmpty()) {
return 0;
@@ -636,5 +619,4 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index dab2d3d..6d09bf6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -73,6 +73,7 @@ import org.apache.nifi.controller.repository.StandardCounterRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
@@ -591,9 +592,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} else {
for (final Connection connection : connections) {
final FlowFileQueue queue = connection.getFlowFileQueue();
- final Long maxFlowFileId = queue.recoverSwappedFlowFiles();
- if (maxFlowFileId != null && maxFlowFileId > maxIdFromSwapFiles) {
- maxIdFromSwapFiles = maxFlowFileId;
+ final SwapSummary swapSummary = queue.recoverSwappedFlowFiles();
+ if (swapSummary != null) {
+ final Long maxFlowFileId = swapSummary.getMaxFlowFileId();
+ if (maxFlowFileId != null && maxFlowFileId > maxIdFromSwapFiles) {
+ maxIdFromSwapFiles = maxFlowFileId;
+ }
+
+ for (final ResourceClaim resourceClaim : swapSummary.getResourceClaims()) {
+ resourceClaimManager.incrementClaimantCount(resourceClaim);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 04763ee..22aacdc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -49,9 +49,11 @@ import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.RepositoryRecordType;
+import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
@@ -787,10 +789,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
@Override
- public Long recoverSwappedFlowFiles() {
+ public SwapSummary recoverSwappedFlowFiles() {
int swapFlowFileCount = 0;
long swapByteCount = 0L;
Long maxId = null;
+ List<ResourceClaim> resourceClaims = new ArrayList<>();
writeLock.lock();
try {
@@ -809,8 +812,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
for (final String swapLocation : swapLocations) {
try {
- final QueueSize queueSize = swapManager.getSwapSize(swapLocation);
- final Long maxSwapRecordId = swapManager.getMaxRecordId(swapLocation);
+ final SwapSummary summary = swapManager.getSwapSummary(swapLocation);
+ final QueueSize queueSize = summary.getQueueSize();
+ final Long maxSwapRecordId = summary.getMaxFlowFileId();
if (maxSwapRecordId != null) {
if (maxId == null || maxSwapRecordId > maxId) {
maxId = maxSwapRecordId;
@@ -819,6 +823,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
swapFlowFileCount += queueSize.getObjectCount();
swapByteCount += queueSize.getByteCount();
+ resourceClaims.addAll(summary.getResourceClaims());
} catch (final IOException ioe) {
logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString());
logger.error("", ioe);
@@ -835,7 +840,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.unlock("Recover Swap Files");
}
- return maxId;
+ return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapSummary.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapSummary.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapSummary.java
new file mode 100644
index 0000000..57a2ffb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapSummary.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.swap;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+
+public class StandardSwapSummary implements SwapSummary {
+ public static final SwapSummary EMPTY_SUMMARY = new StandardSwapSummary(new QueueSize(0, 0L), null, Collections.<ResourceClaim> emptyList());
+
+ private final QueueSize queueSize;
+ private final Long maxFlowFileId;
+ private final List<ResourceClaim> resourceClaims;
+
+ public StandardSwapSummary(final QueueSize queueSize, final Long maxFlowFileId, final List<ResourceClaim> resourceClaims) {
+ this.queueSize = queueSize;
+ this.maxFlowFileId = maxFlowFileId;
+ this.resourceClaims = Collections.unmodifiableList(resourceClaims);
+ }
+
+ @Override
+ public QueueSize getQueueSize() {
+ return queueSize;
+ }
+
+ @Override
+ public Long getMaxFlowFileId() {
+ return maxFlowFileId;
+ }
+
+ @Override
+ public List<ResourceClaim> getResourceClaims() {
+ return resourceClaims;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index b84ae90..412e376 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -46,8 +46,11 @@ import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -472,37 +475,30 @@ public class TestStandardFlowFileQueue {
}
@Override
- public QueueSize getSwapSize(String swapLocation) throws IOException {
+ @SuppressWarnings("deprecation")
+ public SwapSummary getSwapSummary(String swapLocation) throws IOException {
final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
if (flowFiles == null) {
- return new QueueSize(0, 0L);
+ return StandardSwapSummary.EMPTY_SUMMARY;
}
int count = 0;
long size = 0L;
+ Long max = null;
+ final List<ResourceClaim> resourceClaims = new ArrayList<>();
for (final FlowFileRecord flowFile : flowFiles) {
count++;
size += flowFile.getSize();
- }
-
- return new QueueSize(count, size);
- }
-
- @Override
- public Long getMaxRecordId(String swapLocation) throws IOException {
- final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
- if (flowFiles == null) {
- return null;
- }
-
- Long max = null;
- for (final FlowFileRecord flowFile : flowFiles) {
if (max == null || flowFile.getId() > max) {
max = flowFile.getId();
}
+
+ if (flowFile.getContentClaim() != null) {
+ resourceClaims.add(flowFile.getContentClaim().getResourceClaim());
+ }
}
- return max;
+ return new StandardSwapSummary(new QueueSize(count, size), max, resourceClaims);
}
@Override
@@ -588,6 +584,7 @@ public class TestStandardFlowFileQueue {
}
@Override
+ @SuppressWarnings("deprecation")
public int compareTo(final FlowFile o) {
return Long.compare(id, o.getId());
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3bb18b96/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index e836b44..c1c7b45 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -16,27 +16,40 @@
*/
package org.apache.nifi.controller.repository;
-import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
-import org.apache.nifi.controller.repository.StandardRepositoryRecord;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.util.file.FileUtils;
-
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -44,9 +57,106 @@ import org.mockito.stubbing.Answer;
public class TestWriteAheadFlowFileRepository {
+ @BeforeClass
+ public static void setupProperties() {
+ System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
+ }
+
+ @Before
+ public void clearRepo() throws IOException {
+ final File target = new File("target");
+ final File testRepo = new File(target, "test-repo");
+ if (testRepo.exists()) {
+ FileUtils.deleteFile(testRepo, true);
+ }
+ }
+
+ @Test
+ public void testResourceClaimsIncremented() throws IOException {
+ final ResourceClaimManager claimManager = new StandardResourceClaimManager();
+
+ final TestQueueProvider queueProvider = new TestQueueProvider();
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+ when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+
+ final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+ final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000, null);
+
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+ queueProvider.addConnection(connection);
+
+ final ResourceClaim resourceClaim1 = new StandardResourceClaim("container", "section", "1", false);
+ final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
+
+ final ResourceClaim resourceClaim2 = new StandardResourceClaim("container", "section", "2", false);
+ final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
+
+ // Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then,
+ // indicate that a FlowFile was swapped out. We should then be able to recover these FlowFiles and the
+ // resource claims' counts should be updated for both the swapped out FlowFile and the non-swapped out FlowFile
+ try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository()) {
+ repo.initialize(claimManager);
+ repo.loadFlowFiles(queueProvider, -1L);
+
+ // Create a Repository Record that indicates that a FlowFile was created
+ final FlowFileRecord flowFile1 = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+ .contentClaim(claim1)
+ .build();
+ final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue);
+ rec1.setWorking(flowFile1);
+ rec1.setDestination(queue);
+
+ // Create a Record that we can swap out
+ final FlowFileRecord flowFile2 = new StandardFlowFileRecord.Builder()
+ .id(2L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-111111111112")
+ .contentClaim(claim2)
+ .build();
+
+ final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue);
+ rec2.setWorking(flowFile2);
+ rec2.setDestination(queue);
+
+ final List<RepositoryRecord> records = new ArrayList<>();
+ records.add(rec1);
+ records.add(rec2);
+ repo.updateRepository(records);
+
+ final String swapLocation = swapMgr.swapOut(Collections.singletonList(flowFile2), queue);
+ repo.swapFlowFilesOut(Collections.singletonList(flowFile2), queue, swapLocation);
+ }
+
+
+ final ResourceClaimManager recoveryClaimManager = new StandardResourceClaimManager();
+ try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository()) {
+ repo.initialize(recoveryClaimManager);
+ final long largestId = repo.loadFlowFiles(queueProvider, 0L);
+
+ // largest ID known is 1 because this doesn't take into account the FlowFiles that have been swapped out
+ assertEquals(1, largestId);
+ }
+
+ // resource claim 1 will have a single claimant count while resource claim 2 will have no claimant counts
+ // because resource claim 2 is referenced only by flowfiles that are swapped out.
+ assertEquals(1, recoveryClaimManager.getClaimantCount(resourceClaim1));
+ assertEquals(0, recoveryClaimManager.getClaimantCount(resourceClaim2));
+
+ final SwapSummary summary = queue.recoverSwappedFlowFiles();
+ assertNotNull(summary);
+ assertEquals(2, summary.getMaxFlowFileId().intValue());
+ assertEquals(new QueueSize(1, 0L), summary.getQueueSize());
+
+ final List<ResourceClaim> swappedOutClaims = summary.getResourceClaims();
+ assertNotNull(swappedOutClaims);
+ assertEquals(1, swappedOutClaims.size());
+ assertEquals(claim2.getResourceClaim(), swappedOutClaims.get(0));
+ }
+
@Test
public void testRestartWithOneRecord() throws IOException {
- System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
final Path path = Paths.get("target/test-repo");
if (Files.exists(path)) {
FileUtils.deleteFile(path.toFile(), true);
@@ -55,19 +165,7 @@ public class TestWriteAheadFlowFileRepository {
final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository();
repo.initialize(new StandardResourceClaimManager());
- final List<Connection> connectionList = new ArrayList<>();
- final QueueProvider queueProvider = new QueueProvider() {
- @Override
- public Collection<FlowFileQueue> getAllQueues() {
- final List<FlowFileQueue> queueList = new ArrayList<>();
- for (final Connection conn : connectionList) {
- queueList.add(conn.getFlowFileQueue());
- }
-
- return queueList;
- }
- };
-
+ final TestQueueProvider queueProvider = new TestQueueProvider();
repo.loadFlowFiles(queueProvider, 0L);
final List<FlowFileRecord> flowFileCollection = new ArrayList<>();
@@ -87,7 +185,7 @@ public class TestWriteAheadFlowFileRepository {
when(connection.getFlowFileQueue()).thenReturn(queue);
- connectionList.add(connection);
+ queueProvider.addConnection(connection);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.id(1L);
@@ -132,4 +230,113 @@ public class TestWriteAheadFlowFileRepository {
repo2.close();
}
+ private static class TestQueueProvider implements QueueProvider {
+ private List<Connection> connectionList = new ArrayList<>();
+
+ public void addConnection(final Connection connection) {
+ this.connectionList.add(connection);
+ }
+
+ @Override
+ public Collection<FlowFileQueue> getAllQueues() {
+ final List<FlowFileQueue> queueList = new ArrayList<>();
+ for (final Connection conn : connectionList) {
+ queueList.add(conn.getFlowFileQueue());
+ }
+
+ return queueList;
+ }
+ }
+
+ private static class MockFlowFileSwapManager implements FlowFileSwapManager {
+ private final Map<FlowFileQueue, Map<String, List<FlowFileRecord>>> swappedRecords = new HashMap<>();
+
+ @Override
+ public void initialize(SwapManagerInitializationContext initializationContext) {
+ }
+
+ @Override
+ public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
+ Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
+ if (swapMap == null) {
+ swapMap = new HashMap<>();
+ swappedRecords.put(flowFileQueue, swapMap);
+ }
+
+ final String location = UUID.randomUUID().toString();
+ swapMap.put(location, new ArrayList<>(flowFiles));
+ return location;
+ }
+
+ @Override
+ public List<FlowFileRecord> peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
+ Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
+ if (swapMap == null) {
+ return null;
+ }
+
+ return Collections.unmodifiableList(swapMap.get(swapLocation));
+ }
+
+ @Override
+ public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
+ Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
+ if (swapMap == null) {
+ return null;
+ }
+
+ return swapMap.remove(swapLocation);
+ }
+
+ @Override
+ public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
+ Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
+ if (swapMap == null) {
+ return null;
+ }
+
+ return new ArrayList<>(swapMap.keySet());
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public SwapSummary getSwapSummary(String swapLocation) throws IOException {
+ List<FlowFileRecord> records = null;
+ for (final Map<String, List<FlowFileRecord>> swapMap : swappedRecords.values()) {
+ records = swapMap.get(swapLocation);
+ if (records != null) {
+ break;
+ }
+ }
+
+ if (records == null) {
+ return null;
+ }
+
+ final List<ResourceClaim> resourceClaims = new ArrayList<>();
+ long size = 0L;
+ Long maxId = null;
+ for (final FlowFileRecord flowFile : records) {
+ size += flowFile.getSize();
+
+ if (maxId == null || flowFile.getId() > maxId) {
+ maxId = flowFile.getId();
+ }
+
+ final ContentClaim contentClaim = flowFile.getContentClaim();
+ if (contentClaim != null) {
+ final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+ resourceClaims.add(resourceClaim);
+ }
+ }
+
+ return new StandardSwapSummary(new QueueSize(records.size(), size), maxId, resourceClaims);
+ }
+
+ @Override
+ public void purge() {
+ this.swappedRecords.clear();
+ }
+
+ }
}