You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/13 16:03:59 UTC
[2/5] nifi git commit: NIFI-730: Added methods for dropping queued
flowfiles;
refactored swap manager but have not yet started swapping flowfiles in or out
from within the flowfile queue
NIFI-730: Added methods for dropping queued flowfiles; refactored swap manager but have not yet started swapping flowfiles in or out from within the flowfile queue
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b8c51dc3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b8c51dc3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b8c51dc3
Branch: refs/heads/NIFI-730
Commit: b8c51dc35d1a7fdbf3e6449bbe297db667a1176c
Parents: b4bfcc1
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 11 10:27:07 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 11 10:27:07 2015 -0400
----------------------------------------------------------------------
.../apache/nifi/controller/FlowFileQueue.java | 203 ------
.../controller/queue/DropFlowFileState.java | 40 ++
.../controller/queue/DropFlowFileStatus.java | 62 ++
.../nifi/controller/queue/FlowFileQueue.java | 256 ++++++++
.../apache/nifi/controller/queue/QueueSize.java | 48 ++
.../repository/FlowFileRepository.java | 2 +-
.../repository/FlowFileSwapManager.java | 94 ++-
.../controller/repository/QueueProvider.java | 2 +-
.../controller/repository/RepositoryRecord.java | 2 +-
.../SwapManagerInitializationContext.java | 41 ++
.../apache/nifi/processor/ProcessSession.java | 1 +
.../org/apache/nifi/processor/QueueSize.java | 48 --
.../org/apache/nifi/connectable/Connection.java | 2 +-
.../nifi/controller/StandardFlowFileQueue.java | 108 +++-
.../nifi/connectable/StandardConnection.java | 26 +-
.../nifi/controller/FileSystemSwapManager.java | 626 ++++++++-----------
.../apache/nifi/controller/FlowController.java | 193 +++---
.../repository/BatchingSessionFactory.java | 2 +-
.../repository/ConnectionSwapInfo.java | 58 --
.../repository/StandardProcessSession.java | 4 +-
.../repository/StandardRepositoryRecord.java | 2 +-
.../repository/VolatileFlowFileRepository.java | 2 +-
.../WriteAheadFlowFileRepository.java | 2 +-
.../controller/TestFileSystemSwapManager.java | 6 +-
.../repository/TestStandardProcessSession.java | 9 +-
.../TestWriteAheadFlowFileRepository.java | 2 +-
26 files changed, 1003 insertions(+), 838 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
deleted file mode 100644
index e1baeb7..0000000
--- a/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.processor.FlowFileFilter;
-import org.apache.nifi.processor.QueueSize;
-
-public interface FlowFileQueue {
-
- /**
- * @return the unique identifier for this FlowFileQueue
- */
- String getIdentifier();
-
- /**
- * @return list of processing priorities for this queue
- */
- List<FlowFilePrioritizer> getPriorities();
-
- /**
- * @return the minimum number of FlowFiles that must be present in order for
- * FlowFiles to begin being swapped out of the queue
- */
- int getSwapThreshold();
-
- /**
- * Resets the comparator used by this queue to maintain order.
- *
- * @param newPriorities the ordered list of prioritizers to use to determine
- * order within this queue.
- * @throws NullPointerException if arg is null
- */
- void setPriorities(List<FlowFilePrioritizer> newPriorities);
-
- /**
- * Establishes this queue's preferred maximum work load.
- *
- * @param maxQueueSize the maximum number of flow files this processor
- * recommends having in its work queue at any one time
- */
- void setBackPressureObjectThreshold(long maxQueueSize);
-
- /**
- * @return maximum number of flow files that should be queued up at any one
- * time
- */
- long getBackPressureObjectThreshold();
-
- /**
- * @param maxDataSize Establishes this queue's preferred maximum data size.
- */
- void setBackPressureDataSizeThreshold(String maxDataSize);
-
- /**
- * @return maximum data size that should be queued up at any one time
- */
- String getBackPressureDataSizeThreshold();
-
- QueueSize size();
-
- /**
- * @return total size in bytes of the queue flow file's content
- */
- long contentSize();
-
- /**
- * @return true if no items queue; false otherwise
- */
- boolean isEmpty();
-
- /**
- * @return true if the active queue is empty; false otherwise. The Active
- * queue contains those FlowFiles that can be processed immediately and does
- * not include those FlowFiles that have been swapped out or are currently
- * being processed
- */
- boolean isActiveQueueEmpty();
-
- QueueSize getActiveQueueSize();
-
- /**
- * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
- * is considered to be unacknowledged if it has been pulled from the queue by some component
- * but the session that pulled the FlowFile has not yet been committed or rolled back.
- *
- * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'.
- */
- QueueSize getUnacknowledgedQueueSize();
-
- void acknowledge(FlowFileRecord flowFile);
-
- void acknowledge(Collection<FlowFileRecord> flowFiles);
-
- /**
- * @return true if maximum queue size has been reached or exceeded; false
- * otherwise
- */
- boolean isFull();
-
- /**
- * places the given file into the queue
- *
- * @param file to place into queue
- */
- void put(FlowFileRecord file);
-
- /**
- * places the given files into the queue
- *
- * @param files to place into queue
- */
- void putAll(Collection<FlowFileRecord> files);
-
- /**
- * Removes all records from the internal swap queue and returns them.
- *
- * @return all removed records from internal swap queue
- */
- List<FlowFileRecord> pollSwappableRecords();
-
- /**
- * Restores the records from swap space into this queue, adding the records
- * that have expired to the given set instead of enqueuing them.
- *
- * @param records that were swapped in
- */
- void putSwappedRecords(Collection<FlowFileRecord> records);
-
- /**
- * Updates the internal counters of how much data is queued, based on
- * swapped data that is being restored.
- *
- * @param numRecords count of records swapped in
- * @param contentSize total size of records being swapped in
- */
- void incrementSwapCount(int numRecords, long contentSize);
-
- /**
- * @return the number of FlowFiles that are enqueued and not swapped
- */
- int unswappedSize();
-
- int getSwapRecordCount();
-
- int getSwapQueueSize();
-
- /**
- * @param expiredRecords expired records
- * @return the next flow file on the queue; null if empty
- */
- FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
-
- /**
- * @param maxResults limits how many results can be polled
- * @param expiredRecords for expired records
- * @return the next flow files on the queue up to the max results; null if
- * empty
- */
- List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
-
- /**
- * Drains flow files from the given source queue into the given destination
- * list.
- *
- * @param sourceQueue queue to drain from
- * @param destination Collection to drain to
- * @param maxResults max number to drain
- * @param expiredRecords for expired records
- * @return size (bytes) of flow files drained from queue
- */
- long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords);
-
- List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
-
- String getFlowFileExpiration();
-
- int getFlowFileExpiration(TimeUnit timeUnit);
-
- void setFlowFileExpiration(String flowExpirationPeriod);
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
new file mode 100644
index 0000000..3f16d00
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * Represents the state that a Drop FlowFile request is in
+ */
+public enum DropFlowFileState {
+
+ WAITING_FOR_LOCK("Waiting for Destination Component to complete its action"),
+ DROPPING_ACTIVE_FLOWFILES("Dropping FlowFiles from queue"),
+ COMPLETE("Completed Successfully"),
+ FAILURE("Failed");
+
+ private final String description;
+
+ private DropFlowFileState(final String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
new file mode 100644
index 0000000..b216608
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * Represents the status of a Drop FlowFile Request that has been issued to
+ * a {@link FlowFileQueue}. When a queue is requested to drop its FlowFiles,
+ * that process may be rather lengthy in the case of a poorly behaving
+ * FlowFileRepository or if the destination Processor is polling from the
+ * queue using a filter that is misbehaving. As a result, the dropping of
+ * FlowFiles is performed asynchronously.
+ *
+ * This status object provides information about how far along in the process
+ * we currently are and information about the success or failure of the
+ * operation.
+ */
+public interface DropFlowFileStatus {
+
+ /**
+ * @return the identifier of the request to drop FlowFiles from the queue
+ */
+ String getRequestIdentifier();
+
+ /**
+ * @return the date/time (in milliseconds since epoch) at which the request to
+ * drop the FlowFiles from a queue was submitted
+ */
+ long getRequestSubmissionTime();
+
+ /**
+ * @return the size of the queue when the drop request was issued or <code>null</code> if
+ * it is not yet known, which can happen if the {@link DropFlowFileState} is
+ * {@link DropFlowFileState#WAITING_FOR_LOCK}.
+ */
+ QueueSize getOriginalSize();
+
+ /**
+ * @return the current size of the queue or <code>null</code> if it is not yet known
+ */
+ QueueSize getCurrentSize();
+
+ /**
+ * @return the current state of the operation
+ */
+ DropFlowFileState getState();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
new file mode 100644
index 0000000..31f17e0
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
+
+public interface FlowFileQueue {
+
+ /**
+ * @return the unique identifier for this FlowFileQueue
+ */
+ String getIdentifier();
+
+ /**
+ * @return list of processing priorities for this queue
+ */
+ List<FlowFilePrioritizer> getPriorities();
+
+ /**
+ * Reads any Swap Files that belong to this queue and increments counts so that the size
+ * of the queue will reflect the size of all FlowFiles regardless of whether or not they are
+ * swapped out. This will be called only during NiFi startup as an initialization step. This
+ * method is then responsible for returning the largest ID of any FlowFile that is swapped
+ * out, or <code>null</code> if no FlowFiles are swapped out for this queue.
+ *
+ * @return the largest ID of any FlowFile that is swapped out for this queue, or <code>null</code> if
+ * no FlowFiles are swapped out for this queue.
+ */
+ Long recoverSwappedFlowFiles();
+
+ /**
+ * Destroys any Swap Files that exist for this queue without updating the FlowFile Repository
+ * or Provenance Repository. This is done only on startup in the case of non-persistent
+ * repositories. In the case of non-persistent repositories, we may still have Swap Files because
+ * we may still need to overflow the FlowFiles from heap onto disk, even though we don't want to keep
+ * the FlowFiles on restart.
+ */
+ void purgeSwapFiles();
+
+ /**
+ * @return the minimum number of FlowFiles that must be present in order for
+ * FlowFiles to begin being swapped out of the queue
+ */
+ // TODO: REMOVE THIS.
+ int getSwapThreshold();
+
+ /**
+ * Resets the comparator used by this queue to maintain order.
+ *
+ * @param newPriorities the ordered list of prioritizers to use to determine
+ * order within this queue.
+ * @throws NullPointerException if arg is null
+ */
+ void setPriorities(List<FlowFilePrioritizer> newPriorities);
+
+ /**
+ * Establishes this queue's preferred maximum work load.
+ *
+ * @param maxQueueSize the maximum number of flow files this processor
+ * recommends having in its work queue at any one time
+ */
+ void setBackPressureObjectThreshold(long maxQueueSize);
+
+ /**
+ * @return maximum number of flow files that should be queued up at any one
+ * time
+ */
+ long getBackPressureObjectThreshold();
+
+ /**
+ * @param maxDataSize Establishes this queue's preferred maximum data size.
+ */
+ void setBackPressureDataSizeThreshold(String maxDataSize);
+
+ /**
+ * @return maximum data size that should be queued up at any one time
+ */
+ String getBackPressureDataSizeThreshold();
+
+ QueueSize size();
+
+ /**
+ * @return true if no items queue; false otherwise
+ */
+ boolean isEmpty();
+
+ /**
+ * @return true if the active queue is empty; false otherwise. The Active
+ * queue contains those FlowFiles that can be processed immediately and does
+ * not include those FlowFiles that have been swapped out or are currently
+ * being processed
+ */
+ // TODO: REMOVE?
+ boolean isActiveQueueEmpty();
+
+ // TODO: REMOVE?
+ QueueSize getActiveQueueSize();
+
+ /**
+ * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
+ * is considered to be unacknowledged if it has been pulled from the queue by some component
+ * but the session that pulled the FlowFile has not yet been committed or rolled back.
+ *
+ * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'.
+ */
+ QueueSize getUnacknowledgedQueueSize();
+
+ void acknowledge(FlowFileRecord flowFile);
+
+ void acknowledge(Collection<FlowFileRecord> flowFiles);
+
+ /**
+ * @return true if maximum queue size has been reached or exceeded; false
+ * otherwise
+ */
+ boolean isFull();
+
+ /**
+ * places the given file into the queue
+ *
+ * @param file to place into queue
+ */
+ void put(FlowFileRecord file);
+
+ /**
+ * places the given files into the queue
+ *
+ * @param files to place into queue
+ */
+ void putAll(Collection<FlowFileRecord> files);
+
+ /**
+ * Removes all records from the internal swap queue and returns them.
+ *
+ * @return all removed records from internal swap queue
+ */
+ // TODO: REMOVE THIS?
+ List<FlowFileRecord> pollSwappableRecords();
+
+ /**
+ * Restores the records from swap space into this queue, adding the records
+ * that have expired to the given set instead of enqueuing them.
+ *
+ * @param records that were swapped in
+ */
+ // TODO: REMOVE THIS?
+ void putSwappedRecords(Collection<FlowFileRecord> records);
+
+ /**
+ * Updates the internal counters of how much data is queued, based on
+ * swapped data that is being restored.
+ *
+ * @param numRecords count of records swapped in
+ * @param contentSize total size of records being swapped in
+ */
+ // TODO: REMOVE THIS?
+ void incrementSwapCount(int numRecords, long contentSize);
+
+ /**
+ * @return the number of FlowFiles that are enqueued and not swapped
+ */
+ // TODO: REMOVE THIS?
+ int unswappedSize();
+
+ // TODO: REMOVE THIS?
+ int getSwapRecordCount();
+
+ // TODO: REMOVE THIS?
+ int getSwapQueueSize();
+
+ /**
+ * @param expiredRecords expired records
+ * @return the next flow file on the queue; null if empty
+ */
+ FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
+
+ /**
+ * @param maxResults limits how many results can be polled
+ * @param expiredRecords for expired records
+ * @return the next flow files on the queue up to the max results; null if
+ * empty
+ */
+ List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
+
+ /**
+ * Drains flow files from the given source queue into the given destination
+ * list.
+ *
+ * @param sourceQueue queue to drain from
+ * @param destination Collection to drain to
+ * @param maxResults max number to drain
+ * @param expiredRecords for expired records
+ * @return size (bytes) of flow files drained from queue
+ */
+ long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords);
+
+ List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
+
+ String getFlowFileExpiration();
+
+ int getFlowFileExpiration(TimeUnit timeUnit);
+
+ void setFlowFileExpiration(String flowExpirationPeriod);
+
+ /**
+ * Initiates a request to drop all FlowFiles in this queue. This method returns
+ * a DropFlowFileStatus that can be used to determine the current state of the request.
+ * Additionally, the DropFlowFileStatus provides a request identifier that can then be
+ * passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)}
+ * methods in order to obtain the status later or cancel a request
+ *
+ * @return the status of the drop request.
+ */
+ DropFlowFileStatus dropFlowFiles();
+
+ /**
+ * Returns the current status of a Drop FlowFile Request that was initiated via the
+ * {@link #dropFlowFiles()} method that has the given identifier
+ *
+ * @param requestIdentifier the identifier of the Drop FlowFile Request
+ * @return the status for the request with the given identifier, or <code>null</code> if no
+ * request status exists with that identifier
+ */
+ DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier);
+
+ /**
+ * Cancels the request to drop FlowFiles that has the given identifier
+ *
+ * @param requestIdentifier the identifier of the Drop FlowFile Request
+ * @return <code>true</code> if the request was canceled, <code>false</code> if the request has
+ * already completed or is not known
+ */
+ boolean cancelDropFlowFileRequest(String requestIdentifier);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
new file mode 100644
index 0000000..42d8416
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue;
+
+/**
+ *
+ */
+public class QueueSize {
+
+ private final int objectCount;
+ private final long totalSizeBytes;
+
+ public QueueSize(final int numberObjects, final long totalSizeBytes) {
+ if (numberObjects < 0 || totalSizeBytes < 0) {
+ throw new IllegalArgumentException();
+ }
+ objectCount = numberObjects;
+ this.totalSizeBytes = totalSizeBytes;
+ }
+
+ /**
+ * @return number of objects present on the queue
+ */
+ public int getObjectCount() {
+ return objectCount;
+ }
+
+ /**
+ * @return total size in bytes of the content for the data on the queue
+ */
+ public long getByteCount() {
+ return totalSizeBytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 58fc6b3..906cbe2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
/**
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 2e5be11..57e9186 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -16,8 +16,11 @@
*/
package org.apache.nifi.controller.repository;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
/**
* Defines a mechanism by which FlowFiles can be move into external storage or
@@ -26,38 +29,81 @@ import org.apache.nifi.events.EventReporter;
public interface FlowFileSwapManager {
/**
- * Starts the Manager's background threads to start swapping FlowFiles in
- * and out of memory
+ * Initializes the Swap Manager, providing a {@link SwapManagerInitializationContext} so that the
+ * Swap Manager has access to all of the components necessary to perform its functions
*
- * @param flowFileRepository the FlowFileRepository that must be notified of
- * any swapping in or out of FlowFiles
- * @param queueProvider the provider of FlowFileQueue's so that FlowFiles
- * can be obtained and restored
- * @param claimManager the ContentClaimManager to use for interacting with
- * Content Claims
- * @param reporter the EventReporter that can be used for notifying users of
- * important events
+ * @param initializationContext the context the provides the swap manager with access to the
+ * resources that it needs to perform its functions
*/
- void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ResourceClaimManager claimManager, EventReporter reporter);
+ void initialize(SwapManagerInitializationContext initializationContext);
/**
- * Shuts down the manager
+ * Swaps out the given FlowFiles that belong to the queue with the given identifier.
+ *
+ * @param flowFiles the FlowFiles to swap out to external storage
+ * @param flowFileQueue the queue that the FlowFiles belong to
+ * @return the location of the externally stored swap file
+ *
+ * @throws IOException if unable to swap the FlowFiles out
*/
- void shutdown();
+ String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException;
/**
- * Removes all Swap information, permanently destroying any FlowFiles that
- * have been swapped out
+ * Recovers the SwapFiles from the swap file that lives at the given location. This action
+ * provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file
+ * at the given location remains in that location and the FlowFile Repository is not updated.
+ *
+ * @param swapLocation the location of hte swap file
+ * @param flowFileQueue the queue that the FlowFiles belong to
+ * @return the FlowFiles that live at the given swap location
+ *
+ * @throws IOException if unable to recover the FlowFiles from the given location
*/
- void purge();
+ List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException;
/**
- * Notifies FlowFile queues of the number of FlowFiles and content size of
- * all FlowFiles that are currently swapped out
+ * Recovers the FlowFiles from the swap file that lives at the given location and belongs
+ * to the FlowFile Queue with the given identifier. The FlowFile Repository is then updated
+ * and the swap file is permanently removed from the external storage
+ *
+ * @param swapLocation the location of the swap file
+ * @param flowFileQueue the queue to which the FlowFiles belong
+ *
+ * @return the FlowFiles that are stored in the given location
+ *
+ * @throws IOException if unable to recover the FlowFiles from the given location or update the
+ * FlowFileRepository
+ */
+ List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException;
+
+ /**
+ * Determines swap files that exist for the given FlowFileQueue
+ *
+ * @param flowFileQueue the queue for which the FlowFiles should be recovered
*
- * @param connectionProvider provider
- * @param claimManager manager
- * @return how many flowfiles have been recovered
+ * @return all swap locations that have been identified for the given queue, in the order that they should
+ * be swapped back in
*/
- long recoverSwappedFlowFiles(QueueProvider connectionProvider, ResourceClaimManager claimManager);
+ List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException;
+
+ /**
+ * Determines how many FlowFiles and the size of the FlowFiles that are swapped out at the given location
+ *
+ * @param swapLocation the location of the swap file
+ * @return the QueueSize representing the number of FlowFiles and total size of the FlowFiles that are swapped out
+ */
+ QueueSize getSwapSize(String swapLocation) throws IOException;
+
+ /**
+ * Returns the maximum record id of the FlowFiles stored at the given swap location
+ *
+ * @param swapLocation the swap location to read id's from
+ * @return the max record id of any FlowFile in the swap location, or null if no record ID's can be found
+ */
+ Long getMaxRecordId(String swapLocation) throws IOException;
+
+ /**
+ * Purge all known Swap Files without updating FlowFileRepository or Provenance Repository
+ */
+ void purge();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
index fcb516d..95d9f2e 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
@@ -18,7 +18,7 @@ package org.apache.nifi.controller.repository;
import java.util.Collection;
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
/**
* Provides a collection of <code>FlowFileQueue</code>s that represents all
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
index 40d44a8..09202c0 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.controller.repository;
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
/**
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
new file mode 100644
index 0000000..564d5ec
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+
+public interface SwapManagerInitializationContext {
+
+ /**
+ * @return the {@link FlowFileRepository} that should be updated when FlowFiles are swapped in and out
+ */
+ FlowFileRepository getFlowFileRepository();
+
+
+ /**
+ * @return the {@link ResourceClaimManager} that is necessary to provide to the FlowFileRepository when
+ * performing swapping actions
+ */
+ ResourceClaimManager getResourceClaimManager();
+
+ /**
+ * @return an {@link EventReporter} that can be used to report events to users
+ */
+ EventReporter getEventReporter();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index ed46d68..ebd56a9 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
+import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java
deleted file mode 100644
index c3c2ccc..0000000
--- a/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor;
-
-/**
- *
- */
-public class QueueSize {
-
- private final int objectCount;
- private final long totalSizeBytes;
-
- public QueueSize(final int numberObjects, final long totalSizeBytes) {
- if (numberObjects < 0 || totalSizeBytes < 0) {
- throw new IllegalArgumentException();
- }
- objectCount = numberObjects;
- this.totalSizeBytes = totalSizeBytes;
- }
-
- /**
- * @return number of objects present on the queue
- */
- public int getObjectCount() {
- return objectCount;
- }
-
- /**
- * @return total size in bytes of the content for the data on the queue
- */
- public long getByteCount() {
- return totalSizeBytes;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
index 0a0089d..2e66905 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
@@ -20,7 +20,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index f47ea2f..df356fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -34,21 +35,25 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
-import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.concurrency.TimedLock;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,12 +89,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private boolean swapMode = false;
private long maximumQueueObjectCount;
+ private final EventReporter eventReporter;
private final AtomicLong flowFileExpirationMillis;
private final Connection connection;
private final AtomicReference<String> flowFileExpirationPeriod;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private final List<FlowFilePrioritizer> priorities;
private final int swapThreshold;
+ private final FlowFileSwapManager swapManager;
private final TimedLock readLock;
private final TimedLock writeLock;
private final String identifier;
@@ -101,7 +108,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
// SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
private final ProcessScheduler scheduler;
- public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final int swapThreshold) {
+ public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
+ final int swapThreshold) {
activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
priorities = new ArrayList<>();
maximumQueueObjectCount = 0L;
@@ -110,6 +118,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
flowFileExpirationMillis = new AtomicLong(0);
flowFileExpirationPeriod = new AtomicReference<>("0 mins");
swapQueue = new ArrayList<>();
+ this.eventReporter = eventReporter;
+ this.swapManager = swapManager;
this.identifier = identifier;
this.swapThreshold = swapThreshold;
@@ -233,21 +243,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
@Override
- public long contentSize() {
- readLock.lock();
- try {
- final PreFetch prefetch = preFetchRef.get();
- if (prefetch == null) {
- return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount();
- } else {
- return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount() + prefetch.size().getByteCount();
- }
- } finally {
- readLock.unlock("getContentSize");
- }
- }
-
- @Override
public boolean isEmpty() {
readLock.lock();
try {
@@ -945,11 +940,88 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
this.flowFileExpirationMillis.set(millis);
}
+
+ @Override
+ public void purgeSwapFiles() {
+ swapManager.purge();
+ }
+
+ @Override
+ public Long recoverSwappedFlowFiles() {
+ int swapFlowFileCount = 0;
+ long swapByteCount = 0L;
+ Long maxId = null;
+
+ writeLock.lock();
+ try {
+ final List<String> swapLocations;
+ try {
+ swapLocations = swapManager.recoverSwapLocations(this);
+ } catch (final IOException ioe) {
+ logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", getIdentifier());
+ logger.error("", ioe);
+ if (eventReporter != null) {
+ eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " +
+ getIdentifier() + "; see logs for more detials");
+ }
+ return null;
+ }
+
+ for (final String swapLocation : swapLocations) {
+ try {
+ final QueueSize queueSize = swapManager.getSwapSize(swapLocation);
+ final Long maxSwapRecordId = swapManager.getMaxRecordId(swapLocation);
+ if (maxSwapRecordId != null) {
+ if (maxId == null || maxSwapRecordId > maxId) {
+ maxId = maxSwapRecordId;
+ }
+ }
+
+ swapFlowFileCount += queueSize.getObjectCount();
+ swapByteCount += queueSize.getByteCount();
+ } catch (final IOException ioe) {
+ logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString());
+ logger.error("", ioe);
+ if (eventReporter != null) {
+ eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation +
+ "; the file appears to be corrupt. See logs for more details");
+ }
+ }
+ }
+
+ this.swappedRecordCount = swapFlowFileCount;
+ this.swappedContentSize = swapByteCount;
+ } finally {
+ writeLock.unlock("Recover Swap Files");
+ }
+
+ return maxId;
+ }
+
+
@Override
public String toString() {
return "FlowFileQueue[id=" + identifier + "]";
}
+ @Override
+ public DropFlowFileStatus dropFlowFiles() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean cancelDropFlowFileRequest(String requestIdentifier) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
/**
* Lock the queue so that other threads are unable to interact with the
* queue
http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index ad556e2..f0a6d8a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -26,18 +26,19 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.NiFiProperties;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
/**
* Models a connection between connectable components. A connection may contain one or more relationships that map the source component to the destination component.
*/
@@ -65,7 +66,7 @@ public final class StandardConnection implements Connection {
destination = new AtomicReference<>(builder.destination);
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
scheduler = builder.scheduler;
- flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, NiFiProperties.getInstance().getQueueSwapThreshold());
+ flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
}
@@ -259,6 +260,8 @@ public final class StandardConnection implements Connection {
private Connectable source;
private Connectable destination;
private Collection<Relationship> relationships;
+ private FlowFileSwapManager swapManager;
+ private EventReporter eventReporter;
public Builder(final ProcessScheduler scheduler) {
this.scheduler = scheduler;
@@ -305,6 +308,16 @@ public final class StandardConnection implements Connection {
return this;
}
+ public Builder swapManager(final FlowFileSwapManager swapManager) {
+ this.swapManager = swapManager;
+ return this;
+ }
+
+ public Builder eventReporter(final EventReporter eventReporter) {
+ this.eventReporter = eventReporter;
+ return this;
+ }
+
public StandardConnection build() {
if (source == null) {
throw new IllegalStateException("Cannot build a Connection without a Source");
@@ -312,6 +325,9 @@ public final class StandardConnection implements Connection {
if (destination == null) {
throw new IllegalStateException("Cannot build a Connection without a Destination");
}
+ if (swapManager == null) {
+ throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager");
+ }
if (relationships == null) {
relationships = new ArrayList<>();