You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:29:07 UTC
[11/14] nifi git commit: NIFI-5516: Implement Load-Balanced
Connections Refactoring StandardFlowFileQueue to have an
AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added
documentation, cleaned up code some Refactored FlowFileQueue so th
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
deleted file mode 100644
index 5eab4d9..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ /dev/null
@@ -1,1572 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.queue.DropFlowFileState;
-import org.apache.nifi.controller.queue.DropFlowFileStatus;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.queue.FlowFileSummary;
-import org.apache.nifi.controller.queue.ListFlowFileRequest;
-import org.apache.nifi.controller.queue.ListFlowFileState;
-import org.apache.nifi.controller.queue.ListFlowFileStatus;
-import org.apache.nifi.controller.queue.QueueSize;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.IncompleteSwapFileException;
-import org.apache.nifi.controller.repository.RepositoryRecord;
-import org.apache.nifi.controller.repository.RepositoryRecordType;
-import org.apache.nifi.controller.repository.SwapContents;
-import org.apache.nifi.controller.repository.SwapSummary;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.swap.StandardSwapSummary;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.FlowFileFilter;
-import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
-import org.apache.nifi.provenance.ProvenanceEventBuilder;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventRepository;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.concurrency.TimedLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * A FlowFileQueue is used to queue FlowFile objects that are awaiting further
- * processing. Must be thread safe.
- *
- */
-public class StandardFlowFileQueue implements FlowFileQueue {
-
- public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000;
- public static final int SWAP_RECORD_POLL_SIZE = 10000;
-
- private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
-
- private PriorityQueue<FlowFileRecord> activeQueue = null;
-
- // guarded by lock
- private ArrayList<FlowFileRecord> swapQueue = null;
-
- private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0, 0L));
-
- private boolean swapMode = false;
-
- private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>();
- private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
-
- private final EventReporter eventReporter;
- private final Connection connection;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
- private final List<FlowFilePrioritizer> priorities;
- private final int swapThreshold;
- private final FlowFileSwapManager swapManager;
- private final List<String> swapLocations = new ArrayList<>();
- private final TimedLock readLock;
- private final TimedLock writeLock;
- private final String identifier;
- private final FlowFileRepository flowFileRepository;
- private final ProvenanceEventRepository provRepository;
- private final ResourceClaimManager resourceClaimManager;
-
- private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
- private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>();
-
- // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
- private final ProcessScheduler scheduler;
-
- public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
- final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
- final int swapThreshold, final long defaultBackPressureObjectThreshold, final String defaultBackPressureDataSizeThreshold) {
- activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
- priorities = new ArrayList<>();
- swapQueue = new ArrayList<>();
- this.eventReporter = eventReporter;
- this.swapManager = swapManager;
- this.flowFileRepository = flowFileRepo;
- this.provRepository = provRepo;
- this.resourceClaimManager = resourceClaimManager;
-
- this.identifier = identifier;
- this.swapThreshold = swapThreshold;
- this.scheduler = scheduler;
- this.connection = connection;
-
- readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100);
- writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100);
-
- final MaxQueueSize initialMaxQueueSize = new MaxQueueSize(defaultBackPressureDataSizeThreshold,
- DataUnit.parseDataSize(defaultBackPressureDataSizeThreshold, DataUnit.B).longValue(), defaultBackPressureObjectThreshold);
- this.maxQueueSize.set(initialMaxQueueSize);
- }
-
- @Override
- public String getIdentifier() {
- return identifier;
- }
-
- @Override
- public List<FlowFilePrioritizer> getPriorities() {
- return Collections.unmodifiableList(priorities);
- }
-
- @Override
- public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
- writeLock.lock();
- try {
- final PriorityQueue<FlowFileRecord> newQueue = new PriorityQueue<>(Math.max(20, activeQueue.size()), new Prioritizer(newPriorities));
- newQueue.addAll(activeQueue);
- activeQueue = newQueue;
- priorities.clear();
- priorities.addAll(newPriorities);
- } finally {
- writeLock.unlock("setPriorities");
- }
- }
-
- @Override
- public void setBackPressureObjectThreshold(final long threshold) {
- boolean updated = false;
- while (!updated) {
- MaxQueueSize maxSize = maxQueueSize.get();
- final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
- updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
- }
- }
-
- @Override
- public long getBackPressureObjectThreshold() {
- return maxQueueSize.get().getMaxCount();
- }
-
- @Override
- public void setBackPressureDataSizeThreshold(final String maxDataSize) {
- final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
-
- boolean updated = false;
- while (!updated) {
- MaxQueueSize maxSize = maxQueueSize.get();
- final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount());
- updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
- }
- }
-
- @Override
- public String getBackPressureDataSizeThreshold() {
- return maxQueueSize.get().getMaxSize();
- }
-
- @Override
- public QueueSize size() {
- return getQueueSize();
- }
-
-
- private QueueSize getQueueSize() {
- return size.get().toQueueSize();
- }
-
- @Override
- public boolean isEmpty() {
- return size.get().isEmpty();
- }
-
- @Override
- public boolean isActiveQueueEmpty() {
- final FlowFileQueueSize queueSize = size.get();
- return queueSize.activeQueueCount == 0 && queueSize.swappedCount == 0;
- }
-
- @Override
- public QueueSize getActiveQueueSize() {
- return size.get().activeQueueSize();
- }
-
- @Override
- public QueueSize getSwapQueueSize() {
- return size.get().swapQueueSize();
- }
-
- @Override
- public int getSwapFileCount() {
- readLock.lock();
- try {
- return this.swapLocations.size();
- } finally {
- readLock.unlock("getSwapFileCount");
- }
- }
-
- @Override
- public boolean isAllActiveFlowFilesPenalized() {
- readLock.lock();
- try {
- // If there are no elements then we return false
- if (activeQueue.isEmpty()) {
- return false;
- }
-
- // If the first element on the queue is penalized, then we know they all are,
- // because our Comparator will put Penalized FlowFiles at the end. If the first
- // FlowFile is not penalized, then we also know that they are not all penalized,
- // so we can simplify this by looking solely at the first FlowFile in the queue.
- final FlowFileRecord first = activeQueue.peek();
- return first.isPenalized();
- } finally {
- readLock.unlock("isAllActiveFlowFilesPenalized");
- }
- }
-
- @Override
- public boolean isAnyActiveFlowFilePenalized() {
- readLock.lock();
- try {
- return activeQueue.stream().anyMatch(FlowFileRecord::isPenalized);
- } finally {
- readLock.unlock("isAnyActiveFlowFilePenalized");
- }
- }
-
- @Override
- public void acknowledge(final FlowFileRecord flowFile) {
- incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
-
- if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
- // queue was full but no longer is. Notify that the source may now be available to run,
- // because of back pressure caused by this queue.
- scheduler.registerEvent(connection.getSource());
- }
- }
-
- @Override
- public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
- long totalSize = 0L;
- for (final FlowFileRecord flowFile : flowFiles) {
- totalSize += flowFile.getSize();
- }
-
- incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
-
- if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
- // it's possible that queue was full but no longer is. Notify that the source may now be available to run,
- // because of back pressure caused by this queue.
- scheduler.registerEvent(connection.getSource());
- }
- }
-
- @Override
- public boolean isFull() {
- final MaxQueueSize maxSize = maxQueueSize.get();
-
- // Check if max size is set
- if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
- return false;
- }
-
- final QueueSize queueSize = getQueueSize();
- if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
- return true;
- }
-
- if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) {
- return true;
- }
-
- return false;
- }
-
-
- @Override
- public void put(final FlowFileRecord file) {
- writeLock.lock();
- try {
- if (swapMode || activeQueue.size() >= swapThreshold) {
- swapQueue.add(file);
- incrementSwapQueueSize(1, file.getSize(), 0);
- swapMode = true;
- writeSwapFilesIfNecessary();
- } else {
- incrementActiveQueueSize(1, file.getSize());
- activeQueue.add(file);
- }
- } finally {
- writeLock.unlock("put(FlowFileRecord)");
- }
-
- if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
- scheduler.registerEvent(connection.getDestination());
- }
- }
-
- @Override
- public void putAll(final Collection<FlowFileRecord> files) {
- final int numFiles = files.size();
- long bytes = 0L;
- for (final FlowFile flowFile : files) {
- bytes += flowFile.getSize();
- }
-
- writeLock.lock();
- try {
- if (swapMode || activeQueue.size() >= swapThreshold - numFiles) {
- swapQueue.addAll(files);
- incrementSwapQueueSize(numFiles, bytes, 0);
- swapMode = true;
- writeSwapFilesIfNecessary();
- } else {
- incrementActiveQueueSize(numFiles, bytes);
- activeQueue.addAll(files);
- }
- } finally {
- writeLock.unlock("putAll");
- }
-
- if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
- scheduler.registerEvent(connection.getDestination());
- }
- }
-
-
- private boolean isLaterThan(final Long maxAge) {
- if (maxAge == null) {
- return false;
- }
- return maxAge < System.currentTimeMillis();
- }
-
- private Long getExpirationDate(final FlowFile flowFile, final long expirationMillis) {
- if (flowFile == null) {
- return null;
- }
- if (expirationMillis <= 0) {
- return null;
- } else {
- final long entryDate = flowFile.getEntryDate();
- final long expirationDate = entryDate + expirationMillis;
- return expirationDate;
- }
- }
-
- @Override
- public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
- FlowFileRecord flowFile = null;
-
- // First check if we have any records Pre-Fetched.
- final long expirationMillis = expirationPeriod.get().getMillis();
- writeLock.lock();
- try {
- flowFile = doPoll(expiredRecords, expirationMillis);
- return flowFile;
- } finally {
- writeLock.unlock("poll(Set)");
-
- if (flowFile != null) {
- incrementUnacknowledgedQueueSize(1, flowFile.getSize());
- }
- }
- }
-
- private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
- FlowFileRecord flowFile;
- boolean isExpired;
-
- migrateSwapToActive();
-
- long expiredBytes = 0L;
- do {
- flowFile = this.activeQueue.poll();
-
- isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
- if (isExpired) {
- expiredRecords.add(flowFile);
- expiredBytes += flowFile.getSize();
- flowFile = null;
-
- if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
- break;
- }
- } else if (flowFile != null && flowFile.isPenalized()) {
- this.activeQueue.add(flowFile);
- flowFile = null;
- break;
- }
-
- if (flowFile != null) {
- incrementActiveQueueSize(-1, -flowFile.getSize());
- }
- }
- while (isExpired);
-
- if (!expiredRecords.isEmpty()) {
- incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
- }
-
- return flowFile;
- }
-
- @Override
- public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) {
- final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, maxResults));
-
- // First check if we have any records Pre-Fetched.
- writeLock.lock();
- try {
- doPoll(records, maxResults, expiredRecords);
- } finally {
- writeLock.unlock("poll(int, Set)");
- }
- return records;
- }
-
- private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) {
- migrateSwapToActive();
-
- final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords);
-
- long expiredBytes = 0L;
- for (final FlowFileRecord record : expiredRecords) {
- expiredBytes += record.getSize();
- }
-
- incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained);
- incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes);
- }
-
- /**
- * If there are FlowFiles waiting on the swap queue, move them to the active
- * queue until we meet our threshold. This prevents us from having to swap
- * them to disk & then back out.
- *
- * This method MUST be called with the writeLock held.
- */
- private void migrateSwapToActive() {
- // Migrate as many FlowFiles as we can from the Swap Queue to the Active Queue, so that we don't
- // have to swap them out & then swap them back in.
- // If we don't do this, we could get into a situation where we have potentially thousands of FlowFiles
- // sitting on the Swap Queue but not getting processed because there aren't enough to be swapped out.
- // In particular, this can happen if the queue is typically filled with surges.
- // For example, if the queue has 25,000 FlowFiles come in, it may process 20,000 of them and leave
- // 5,000 sitting on the Swap Queue. If it then takes an hour for an additional 5,000 FlowFiles to come in,
- // those FlowFiles sitting on the Swap Queue will sit there for an hour, waiting to be swapped out and
- // swapped back in again.
- // Calling this method when records are polled prevents this condition by migrating FlowFiles from the
- // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
- // to disk, because we want them to be swapped back in in the same order that they were swapped out.
-
- final int activeQueueSize = activeQueue.size();
- if (activeQueueSize > 0 && activeQueueSize > swapThreshold - SWAP_RECORD_POLL_SIZE) {
- return;
- }
-
- // If there are swap files waiting to be swapped in, swap those in first. We do this in order to ensure that those that
- // were swapped out first are then swapped back in first. If we instead just immediately migrated the FlowFiles from the
- // swap queue to the active queue, and we never run out of FlowFiles in the active queue (because destination cannot
- // keep up with queue), we will end up always processing the new FlowFiles first instead of the FlowFiles that arrived
- // first.
- if (!swapLocations.isEmpty()) {
- final String swapLocation = swapLocations.get(0);
- boolean partialContents = false;
- SwapContents swapContents = null;
- try {
- swapContents = swapManager.swapIn(swapLocation, this);
- swapLocations.remove(0);
- } catch (final IncompleteSwapFileException isfe) {
- logger.error("Failed to swap in all FlowFiles from Swap File {}; Swap File ended prematurely. The records that were present will still be swapped in", swapLocation);
- logger.error("", isfe);
- swapContents = isfe.getPartialContents();
- partialContents = true;
- swapLocations.remove(0);
- } catch (final FileNotFoundException fnfe) {
- logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation);
- if (eventReporter != null) {
- eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found");
- }
-
- swapLocations.remove(0);
- return;
- } catch (final IOException ioe) {
- logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation);
- logger.error("", ioe);
- if (eventReporter != null) {
- eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " +
- swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information.");
- }
-
- // We do not remove the Swap File from swapLocations because the IOException may be recoverable later. For instance, the file may be on a network
- // drive and we may have connectivity problems, etc.
- return;
- } catch (final Throwable t) {
- logger.error("Failed to swap in FlowFiles from Swap File {}", swapLocation, t);
-
- // We do not remove the Swap File from swapLocations because this is an unexpected failure that may be retry-able. For example, if there were
- // an OOME, etc. then we don't want to he queue to still reflect that the data is around but never swap it in. By leaving the Swap File
- // in swapLocations, we will continue to retry.
- throw t;
- }
-
- final QueueSize swapSize = swapContents.getSummary().getQueueSize();
- final long contentSize = swapSize.getByteCount();
- final int flowFileCount = swapSize.getObjectCount();
- incrementSwapQueueSize(-flowFileCount, -contentSize, -1);
-
- if (partialContents) {
- // if we have partial results, we need to calculate the content size of the flowfiles
- // actually swapped back in.
- long contentSizeSwappedIn = 0L;
- for (final FlowFileRecord swappedIn : swapContents.getFlowFiles()) {
- contentSizeSwappedIn += swappedIn.getSize();
- }
-
- incrementActiveQueueSize(swapContents.getFlowFiles().size(), contentSizeSwappedIn);
- } else {
- // we swapped in the whole swap file. We can just use the info that we got from the summary.
- incrementActiveQueueSize(flowFileCount, contentSize);
- }
-
- activeQueue.addAll(swapContents.getFlowFiles());
- return;
- }
-
- // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense
- // of other checks for 99.999% of the cases.
- if (size.get().swappedCount == 0 && swapQueue.isEmpty()) {
- return;
- }
-
- if (size.get().swappedCount > swapQueue.size()) {
- // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for
- // the files to be swapped back in first
- return;
- }
-
- int recordsMigrated = 0;
- long bytesMigrated = 0L;
- final Iterator<FlowFileRecord> swapItr = swapQueue.iterator();
- while (activeQueue.size() < swapThreshold && swapItr.hasNext()) {
- final FlowFileRecord toMigrate = swapItr.next();
- activeQueue.add(toMigrate);
- bytesMigrated += toMigrate.getSize();
- recordsMigrated++;
- swapItr.remove();
- }
-
- if (recordsMigrated > 0) {
- incrementActiveQueueSize(recordsMigrated, bytesMigrated);
- incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0);
- }
-
- if (size.get().swappedCount == 0) {
- swapMode = false;
- }
- }
-
- /**
- * This method MUST be called with the write lock held
- */
- private void writeSwapFilesIfNecessary() {
- if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
- return;
- }
-
- migrateSwapToActive();
-
- final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
-
- int originalSwapQueueCount = swapQueue.size();
- long originalSwapQueueBytes = 0L;
- for (final FlowFileRecord flowFile : swapQueue) {
- originalSwapQueueBytes += flowFile.getSize();
- }
-
- // Create a new Priority queue with the prioritizers that are set, but reverse the
- // prioritizers because we want to pull the lowest-priority FlowFiles to swap out
- final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new Prioritizer(priorities)));
- tempQueue.addAll(activeQueue);
- tempQueue.addAll(swapQueue);
-
- long bytesSwappedOut = 0L;
- int flowFilesSwappedOut = 0;
- final List<String> swapLocations = new ArrayList<>(numSwapFiles);
- for (int i = 0; i < numSwapFiles; i++) {
- // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records
- final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE);
- for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
- final FlowFileRecord flowFile = tempQueue.poll();
- toSwap.add(flowFile);
- bytesSwappedOut += flowFile.getSize();
- flowFilesSwappedOut++;
- }
-
- try {
- Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue.
- final String swapLocation = swapManager.swapOut(toSwap, this);
- swapLocations.add(swapLocation);
- } catch (final IOException ioe) {
- tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue.
- logger.error("FlowFile Queue with identifier {} has {} FlowFiles queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting "
- + "the Java heap space but failed to write information to disk due to {}", getIdentifier(), getQueueSize().getObjectCount(), ioe.toString());
- logger.error("", ioe);
- if (eventReporter != null) {
- eventReporter.reportEvent(Severity.ERROR, "Failed to Overflow to Disk", "Flowfile Queue with identifier " + getIdentifier() + " has " + getQueueSize().getObjectCount() +
- " queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk. "
- + "See logs for more information.");
- }
-
- break;
- }
- }
-
- // Pull any records off of the temp queue that won't fit back on the active queue, and add those to the
- // swap queue. Then add the records back to the active queue.
- swapQueue.clear();
- long updatedSwapQueueBytes = 0L;
- while (tempQueue.size() > swapThreshold) {
- final FlowFileRecord record = tempQueue.poll();
- swapQueue.add(record);
- updatedSwapQueueBytes += record.getSize();
- }
-
- Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue
-
- // replace the contents of the active queue, since we've merged it with the swap queue.
- activeQueue.clear();
- FlowFileRecord toRequeue;
- long activeQueueBytes = 0L;
- while ((toRequeue = tempQueue.poll()) != null) {
- activeQueue.offer(toRequeue);
- activeQueueBytes += toRequeue.getSize();
- }
-
- boolean updated = false;
- while (!updated) {
- final FlowFileQueueSize originalSize = size.get();
-
- final int addedSwapRecords = swapQueue.size() - originalSwapQueueCount;
- final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes;
-
- final FlowFileQueueSize newSize = new FlowFileQueueSize(activeQueue.size(), activeQueueBytes,
- originalSize.swappedCount + addedSwapRecords + flowFilesSwappedOut,
- originalSize.swappedBytes + addedSwapBytes + bytesSwappedOut,
- originalSize.swapFiles + numSwapFiles,
- originalSize.unacknowledgedCount, originalSize.unacknowledgedBytes);
- updated = size.compareAndSet(originalSize, newSize);
- }
-
- this.swapLocations.addAll(swapLocations);
- }
-
-
- @Override
- public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords) {
- long drainedSize = 0L;
- FlowFileRecord pulled = null;
-
- final long expirationMillis = expirationPeriod.get().getMillis();
- while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
- if (isLaterThan(getExpirationDate(pulled, expirationMillis))) {
- expiredRecords.add(pulled);
- if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
- break;
- }
- } else {
- if (pulled.isPenalized()) {
- sourceQueue.add(pulled);
- break;
- }
- destination.add(pulled);
- }
- drainedSize += pulled.getSize();
- }
- return drainedSize;
- }
-
- @Override
- public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
- long bytesPulled = 0L;
- int flowFilesPulled = 0;
-
- writeLock.lock();
- try {
- migrateSwapToActive();
-
- final long expirationMillis = expirationPeriod.get().getMillis();
-
- final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
- final List<FlowFileRecord> unselected = new ArrayList<>();
-
- while (true) {
- FlowFileRecord flowFile = this.activeQueue.poll();
- if (flowFile == null) {
- break;
- }
-
- final boolean isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
- if (isExpired) {
- expiredRecords.add(flowFile);
- bytesPulled += flowFile.getSize();
- flowFilesPulled++;
-
- if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
- break;
- } else {
- continue;
- }
- } else if (flowFile.isPenalized()) {
- this.activeQueue.add(flowFile);
- flowFile = null;
- break; // just stop searching because the rest are all penalized.
- }
-
- final FlowFileFilterResult result = filter.filter(flowFile);
- if (result.isAccept()) {
- bytesPulled += flowFile.getSize();
- flowFilesPulled++;
-
- incrementUnacknowledgedQueueSize(1, flowFile.getSize());
- selectedFlowFiles.add(flowFile);
- } else {
- unselected.add(flowFile);
- }
-
- if (!result.isContinue()) {
- break;
- }
- }
-
- this.activeQueue.addAll(unselected);
- incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
-
- return selectedFlowFiles;
- } finally {
- writeLock.unlock("poll(Filter, Set)");
- }
- }
-
-
-
- private static final class Prioritizer implements Comparator<FlowFileRecord>, Serializable {
-
- private static final long serialVersionUID = 1L;
- private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
-
- private Prioritizer(final List<FlowFilePrioritizer> priorities) {
- if (null != priorities) {
- prioritizers.addAll(priorities);
- }
- }
-
- @Override
- public int compare(final FlowFileRecord f1, final FlowFileRecord f2) {
- int returnVal = 0;
- final boolean f1Penalized = f1.isPenalized();
- final boolean f2Penalized = f2.isPenalized();
-
- if (f1Penalized && !f2Penalized) {
- return 1;
- } else if (!f1Penalized && f2Penalized) {
- return -1;
- }
-
- if (f1Penalized && f2Penalized) {
- if (f1.getPenaltyExpirationMillis() < f2.getPenaltyExpirationMillis()) {
- return -1;
- } else if (f1.getPenaltyExpirationMillis() > f2.getPenaltyExpirationMillis()) {
- return 1;
- }
- }
-
- if (!prioritizers.isEmpty()) {
- for (final FlowFilePrioritizer prioritizer : prioritizers) {
- returnVal = prioritizer.compare(f1, f2);
- if (returnVal != 0) {
- return returnVal;
- }
- }
- }
-
- final ContentClaim claim1 = f1.getContentClaim();
- final ContentClaim claim2 = f2.getContentClaim();
-
- // put the one without a claim first
- if (claim1 == null && claim2 != null) {
- return -1;
- } else if (claim1 != null && claim2 == null) {
- return 1;
- } else if (claim1 != null && claim2 != null) {
- final int claimComparison = claim1.compareTo(claim2);
- if (claimComparison != 0) {
- return claimComparison;
- }
-
- final int claimOffsetComparison = Long.compare(f1.getContentClaimOffset(), f2.getContentClaimOffset());
- if (claimOffsetComparison != 0) {
- return claimOffsetComparison;
- }
- }
-
- return Long.compare(f1.getId(), f2.getId());
- }
- }
-
- @Override
- public String getFlowFileExpiration() {
- return expirationPeriod.get().getPeriod();
- }
-
- @Override
- public int getFlowFileExpiration(final TimeUnit timeUnit) {
- return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void setFlowFileExpiration(final String flowExpirationPeriod) {
- final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
- if (millis < 0) {
- throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
- }
-
- expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
- }
-
-
- @Override
- public void purgeSwapFiles() {
- swapManager.purge();
- }
-
- @Override
- public SwapSummary recoverSwappedFlowFiles() {
- int swapFlowFileCount = 0;
- long swapByteCount = 0L;
- Long maxId = null;
- List<ResourceClaim> resourceClaims = new ArrayList<>();
- final long startNanos = System.nanoTime();
-
- writeLock.lock();
- try {
- final List<String> swapLocations;
- try {
- swapLocations = swapManager.recoverSwapLocations(this);
- } catch (final IOException ioe) {
- logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", getIdentifier());
- logger.error("", ioe);
- if (eventReporter != null) {
- eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " +
- getIdentifier() + "; see logs for more detials");
- }
- return null;
- }
-
- for (final String swapLocation : swapLocations) {
- try {
- final SwapSummary summary = swapManager.getSwapSummary(swapLocation);
- final QueueSize queueSize = summary.getQueueSize();
- final Long maxSwapRecordId = summary.getMaxFlowFileId();
- if (maxSwapRecordId != null) {
- if (maxId == null || maxSwapRecordId > maxId) {
- maxId = maxSwapRecordId;
- }
- }
-
- swapFlowFileCount += queueSize.getObjectCount();
- swapByteCount += queueSize.getByteCount();
- resourceClaims.addAll(summary.getResourceClaims());
- } catch (final IOException ioe) {
- logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString());
- logger.error("", ioe);
- if (eventReporter != null) {
- eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation +
- "; the file appears to be corrupt. See logs for more details");
- }
- }
- }
-
- incrementSwapQueueSize(swapFlowFileCount, swapByteCount, swapLocations.size());
- this.swapLocations.addAll(swapLocations);
- } finally {
- writeLock.unlock("Recover Swap Files");
- }
-
- if (!swapLocations.isEmpty()) {
- final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- logger.info("Recovered {} swap files for {} in {} millis", swapLocations.size(), this, millis);
- }
-
- return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims);
- }
-
-
- @Override
- public String toString() {
- return "FlowFileQueue[id=" + identifier + "]";
- }
-
-
- @Override
- public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) {
- // purge any old requests from the map just to keep it clean. But if there are very few requests, which is usually the case, then don't bother
- if (listRequestMap.size() > 10) {
- final List<String> toDrop = new ArrayList<>();
- for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) {
- final ListFlowFileRequest request = entry.getValue();
- final boolean completed = request.getState() == ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE;
-
- if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
- toDrop.add(entry.getKey());
- }
- }
-
- for (final String requestId : toDrop) {
- listRequestMap.remove(requestId);
- }
- }
-
- // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue.
- final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, maxResults, size());
-
- final Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- int position = 0;
- int resultCount = 0;
- final List<FlowFileSummary> summaries = new ArrayList<>();
-
- // Create an ArrayList that contains all of the contents of the active queue.
- // We do this so that we don't have to hold the lock any longer than absolutely necessary.
- // We cannot simply pull the first 'maxResults' records from the queue, however, because the
- // Iterator provided by PriorityQueue does not return records in order. So we would have to either
- // use a writeLock and 'pop' the first 'maxResults' records off the queue or use a read lock and
- // do a shallow copy of the queue. The shallow copy is generally quicker because it doesn't have to do
- // the sorting to put the records back. So even though this has an expensive of Java Heap to create the
- // extra collection, we are making this trade-off to avoid locking the queue any longer than required.
- final List<FlowFileRecord> allFlowFiles;
- final Prioritizer prioritizer;
- readLock.lock();
- try {
- logger.debug("{} Acquired lock to perform listing of FlowFiles", StandardFlowFileQueue.this);
- allFlowFiles = new ArrayList<>(activeQueue);
- prioritizer = new Prioritizer(StandardFlowFileQueue.this.priorities);
- } finally {
- readLock.unlock("List FlowFiles");
- }
-
- listRequest.setState(ListFlowFileState.CALCULATING_LIST);
-
- // sort the FlowFileRecords so that we have the list in the same order as on the queue.
- Collections.sort(allFlowFiles, prioritizer);
-
- for (final FlowFileRecord flowFile : allFlowFiles) {
- summaries.add(summarize(flowFile, ++position));
- if (summaries.size() >= maxResults) {
- break;
- }
- }
-
- logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", StandardFlowFileQueue.this, resultCount);
- listRequest.setFlowFileSummaries(summaries);
- listRequest.setState(ListFlowFileState.COMPLETE);
- }
- }, "List FlowFiles for Connection " + getIdentifier());
- t.setDaemon(true);
- t.start();
-
- listRequestMap.put(requestIdentifier, listRequest);
- return listRequest;
- }
-
- private FlowFileSummary summarize(final FlowFile flowFile, final int position) {
- // extract all of the information that we care about into new variables rather than just
- // wrapping the FlowFile object with a FlowFileSummary object. We do this because we want to
- // be able to hold many FlowFileSummary objects in memory and if we just wrap the FlowFile object,
- // we will end up holding the entire FlowFile (including all Attributes) in the Java heap as well,
- // which can be problematic if we expect them to be swapped out.
- final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
- final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
- final long size = flowFile.getSize();
- final Long lastQueuedTime = flowFile.getLastQueueDate();
- final long lineageStart = flowFile.getLineageStartDate();
- final boolean penalized = flowFile.isPenalized();
-
- return new FlowFileSummary() {
- @Override
- public String getUuid() {
- return uuid;
- }
-
- @Override
- public String getFilename() {
- return filename;
- }
-
- @Override
- public int getPosition() {
- return position;
- }
-
- @Override
- public long getSize() {
- return size;
- }
-
- @Override
- public long getLastQueuedTime() {
- return lastQueuedTime == null ? 0L : lastQueuedTime;
- }
-
- @Override
- public long getLineageStartDate() {
- return lineageStart;
- }
-
- @Override
- public boolean isPenalized() {
- return penalized;
- }
- };
- }
-
-
- @Override
- public ListFlowFileStatus getListFlowFileStatus(final String requestIdentifier) {
- return listRequestMap.get(requestIdentifier);
- }
-
- @Override
- public ListFlowFileStatus cancelListFlowFileRequest(final String requestIdentifier) {
- logger.info("Canceling ListFlowFile Request with ID {}", requestIdentifier);
- final ListFlowFileRequest request = listRequestMap.remove(requestIdentifier);
- if (request != null) {
- request.cancel();
- }
-
- return request;
- }
-
- @Override
- public FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException {
- if (flowFileUuid == null) {
- return null;
- }
-
- readLock.lock();
- try {
- // read through all of the FlowFiles in the queue, looking for the FlowFile with the given ID
- for (final FlowFileRecord flowFile : activeQueue) {
- if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
- return flowFile;
- }
- }
- } finally {
- readLock.unlock("getFlowFile");
- }
-
- return null;
- }
-
-
- @Override
- public void verifyCanList() throws IllegalStateException {
- }
-
- @Override
- public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
- logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier);
-
- // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
- if (dropRequestMap.size() > 10) {
- final List<String> toDrop = new ArrayList<>();
- for (final Map.Entry<String, DropFlowFileRequest> entry : dropRequestMap.entrySet()) {
- final DropFlowFileRequest request = entry.getValue();
- final boolean completed = request.getState() == DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
-
- if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
- toDrop.add(entry.getKey());
- }
- }
-
- for (final String requestId : toDrop) {
- dropRequestMap.remove(requestId);
- }
- }
-
- final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier);
- final QueueSize originalSize = getQueueSize();
- dropRequest.setCurrentSize(originalSize);
- dropRequest.setOriginalSize(originalSize);
- if (originalSize.getObjectCount() == 0) {
- dropRequest.setDroppedSize(originalSize);
- dropRequest.setState(DropFlowFileState.COMPLETE);
- dropRequestMap.put(requestIdentifier, dropRequest);
- return dropRequest;
- }
-
- final Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- writeLock.lock();
- try {
- dropRequest.setState(DropFlowFileState.DROPPING_FLOWFILES);
- logger.debug("For DropFlowFileRequest {}, original size is {}", requestIdentifier, getQueueSize());
-
- try {
- final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
-
- QueueSize droppedSize;
- try {
- if (dropRequest.getState() == DropFlowFileState.CANCELED) {
- logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
- return;
- }
-
- droppedSize = drop(activeQueueRecords, requestor);
- logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize);
- } catch (final IOException ioe) {
- logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
- logger.error("", ioe);
-
- dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
- return;
- }
-
- activeQueue.clear();
- incrementActiveQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount());
- dropRequest.setCurrentSize(getQueueSize());
- dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
-
- final QueueSize swapSize = size.get().swapQueueSize();
- logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}",
- requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount());
- if (dropRequest.getState() == DropFlowFileState.CANCELED) {
- logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
- return;
- }
-
- try {
- droppedSize = drop(swapQueue, requestor);
- } catch (final IOException ioe) {
- logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
- logger.error("", ioe);
-
- dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
- return;
- }
-
- swapQueue.clear();
- dropRequest.setCurrentSize(getQueueSize());
- dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
- swapMode = false;
- incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount(), 0);
- logger.debug("For DropFlowFileRequest {}, dropped {} from Swap Queue", requestIdentifier, droppedSize);
-
- final int swapFileCount = swapLocations.size();
- final Iterator<String> swapLocationItr = swapLocations.iterator();
- while (swapLocationItr.hasNext()) {
- final String swapLocation = swapLocationItr.next();
-
- SwapContents swapContents = null;
- try {
- if (dropRequest.getState() == DropFlowFileState.CANCELED) {
- logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
- return;
- }
-
- swapContents = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
- droppedSize = drop(swapContents.getFlowFiles(), requestor);
- } catch (final IncompleteSwapFileException isfe) {
- swapContents = isfe.getPartialContents();
- final String warnMsg = "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the file was corrupt. "
- + "Some FlowFiles may not be dropped from the queue until NiFi is restarted.";
-
- logger.warn(warnMsg);
- if (eventReporter != null) {
- eventReporter.reportEvent(Severity.WARNING, "Drop FlowFiles", warnMsg);
- }
- } catch (final IOException ioe) {
- logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}",
- swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
- logger.error("", ioe);
- if (eventReporter != null) {
- eventReporter.reportEvent(Severity.ERROR, "Drop FlowFiles", "Failed to swap in FlowFiles from Swap File " + swapLocation
- + ". The FlowFiles contained in this Swap File will not be dropped from the queue");
- }
-
- dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString());
- if (swapContents != null) {
- activeQueue.addAll(swapContents.getFlowFiles()); // ensure that we don't lose the FlowFiles from our queue.
- }
-
- return;
- }
-
- dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
- incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount(), -1);
-
- dropRequest.setCurrentSize(getQueueSize());
- swapLocationItr.remove();
- logger.debug("For DropFlowFileRequest {}, dropped {} for Swap File {}", requestIdentifier, droppedSize, swapLocation);
- }
-
- logger.debug("Dropped FlowFiles from {} Swap Files", swapFileCount);
- logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}",
- dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), StandardFlowFileQueue.this.getIdentifier(), requestor);
- dropRequest.setState(DropFlowFileState.COMPLETE);
- } catch (final Exception e) {
- logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), e.toString());
- logger.error("", e);
- dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString());
- }
- } finally {
- writeLock.unlock("Drop FlowFiles");
- }
- }
- }, "Drop FlowFiles for Connection " + getIdentifier());
- t.setDaemon(true);
- t.start();
-
- dropRequestMap.put(requestIdentifier, dropRequest);
-
- return dropRequest;
- }
-
- private QueueSize drop(final List<FlowFileRecord> flowFiles, final String requestor) throws IOException {
- // Create a Provenance Event and a FlowFile Repository record for each FlowFile
- final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
- final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
- for (final FlowFileRecord flowFile : flowFiles) {
- provenanceEvents.add(createDropEvent(flowFile, requestor));
- flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
- }
-
- long dropContentSize = 0L;
- for (final FlowFileRecord flowFile : flowFiles) {
- dropContentSize += flowFile.getSize();
- final ContentClaim contentClaim = flowFile.getContentClaim();
- if (contentClaim == null) {
- continue;
- }
-
- final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
- if (resourceClaim == null) {
- continue;
- }
-
- resourceClaimManager.decrementClaimantCount(resourceClaim);
- }
-
- provRepository.registerEvents(provenanceEvents);
- flowFileRepository.updateRepository(flowFileRepoRecords);
- return new QueueSize(flowFiles.size(), dropContentSize);
- }
-
- private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String requestor) {
- final ProvenanceEventBuilder builder = provRepository.eventBuilder();
- builder.fromFlowFile(flowFile);
- builder.setEventType(ProvenanceEventType.DROP);
- builder.setLineageStartDate(flowFile.getLineageStartDate());
- builder.setComponentId(getIdentifier());
- builder.setComponentType("Connection");
- builder.setAttributes(flowFile.getAttributes(), Collections.<String, String> emptyMap());
- builder.setDetails("FlowFile Queue emptied by " + requestor);
- builder.setSourceQueueIdentifier(getIdentifier());
-
- final ContentClaim contentClaim = flowFile.getContentClaim();
- if (contentClaim != null) {
- final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
- builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize());
- }
-
- return builder.build();
- }
-
- private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord flowFile) {
- return new RepositoryRecord() {
- @Override
- public FlowFileQueue getDestination() {
- return null;
- }
-
- @Override
- public FlowFileQueue getOriginalQueue() {
- return StandardFlowFileQueue.this;
- }
-
- @Override
- public RepositoryRecordType getType() {
- return RepositoryRecordType.DELETE;
- }
-
- @Override
- public ContentClaim getCurrentClaim() {
- return flowFile.getContentClaim();
- }
-
- @Override
- public ContentClaim getOriginalClaim() {
- return flowFile.getContentClaim();
- }
-
- @Override
- public long getCurrentClaimOffset() {
- return flowFile.getContentClaimOffset();
- }
-
- @Override
- public FlowFileRecord getCurrent() {
- return flowFile;
- }
-
- @Override
- public boolean isAttributesChanged() {
- return false;
- }
-
- @Override
- public boolean isMarkedForAbort() {
- return false;
- }
-
- @Override
- public String getSwapLocation() {
- return null;
- }
-
- @Override
- public List<ContentClaim> getTransientClaims() {
- return Collections.emptyList();
- }
- };
- }
-
-
- @Override
- public DropFlowFileRequest cancelDropFlowFileRequest(final String requestIdentifier) {
- final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier);
- if (request == null) {
- return null;
- }
-
- request.cancel();
- return request;
- }
-
- @Override
- public DropFlowFileStatus getDropFlowFileStatus(final String requestIdentifier) {
- return dropRequestMap.get(requestIdentifier);
- }
-
- /**
- * Lock the queue so that other threads are unable to interact with the
- * queue
- */
- public void lock() {
- writeLock.lock();
- }
-
- /**
- * Unlock the queue
- */
- public void unlock() {
- writeLock.unlock("external unlock");
- }
-
- @Override
- public QueueSize getUnacknowledgedQueueSize() {
- return size.get().unacknowledgedQueueSize();
- }
-
- private void incrementActiveQueueSize(final int count, final long bytes) {
- boolean updated = false;
- while (!updated) {
- final FlowFileQueueSize original = size.get();
- final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount + count, original.activeQueueBytes + bytes,
- original.swappedCount, original.swappedBytes, original.swapFiles, original.unacknowledgedCount, original.unacknowledgedBytes);
- updated = size.compareAndSet(original, newSize);
-
- if (updated) {
- logIfNegative(original, newSize, "active");
- }
- }
- }
-
- private void incrementSwapQueueSize(final int count, final long bytes, final int fileCount) {
- boolean updated = false;
- while (!updated) {
- final FlowFileQueueSize original = size.get();
- final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
- original.swappedCount + count, original.swappedBytes + bytes, original.swapFiles + fileCount, original.unacknowledgedCount, original.unacknowledgedBytes);
- updated = size.compareAndSet(original, newSize);
-
- if (updated) {
- logIfNegative(original, newSize, "swap");
- }
- }
- }
-
- private void incrementUnacknowledgedQueueSize(final int count, final long bytes) {
- boolean updated = false;
- while (!updated) {
- final FlowFileQueueSize original = size.get();
- final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
- original.swappedCount, original.swappedBytes, original.swapFiles, original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes);
- updated = size.compareAndSet(original, newSize);
-
- if (updated) {
- logIfNegative(original, newSize, "Unacknowledged");
- }
- }
- }
-
- private void logIfNegative(final FlowFileQueueSize original, final FlowFileQueueSize newSize, final String counterName) {
- if (newSize.activeQueueBytes < 0 || newSize.activeQueueCount < 0 || newSize.swappedBytes < 0 || newSize.swappedCount < 0
- || newSize.unacknowledgedBytes < 0 || newSize.unacknowledgedCount < 0) {
-
- logger.error("Updated Size of Queue " + counterName + " from " + original + " to " + newSize, new RuntimeException("Cannot create negative queue size"));
-
- }
- }
-
-
- private static class FlowFileQueueSize {
- private final int activeQueueCount;
- private final long activeQueueBytes;
- private final int swappedCount;
- private final long swappedBytes;
- private final int swapFiles;
- private final int unacknowledgedCount;
- private final long unacknowledgedBytes;
-
- public FlowFileQueueSize(final int activeQueueCount, final long activeQueueBytes, final int swappedCount, final long swappedBytes, final int swapFileCount,
- final int unacknowledgedCount, final long unacknowledgedBytes) {
- this.activeQueueCount = activeQueueCount;
- this.activeQueueBytes = activeQueueBytes;
- this.swappedCount = swappedCount;
- this.swappedBytes = swappedBytes;
- this.swapFiles = swapFileCount;
- this.unacknowledgedCount = unacknowledgedCount;
- this.unacknowledgedBytes = unacknowledgedBytes;
- }
-
- public boolean isEmpty() {
- return activeQueueCount == 0 && swappedCount == 0 && unacknowledgedCount == 0;
- }
-
- public QueueSize toQueueSize() {
- return new QueueSize(activeQueueCount + swappedCount + unacknowledgedCount, activeQueueBytes + swappedBytes + unacknowledgedBytes);
- }
-
- public QueueSize activeQueueSize() {
- return new QueueSize(activeQueueCount, activeQueueBytes);
- }
-
- public QueueSize unacknowledgedQueueSize() {
- return new QueueSize(unacknowledgedCount, unacknowledgedBytes);
- }
-
- public QueueSize swapQueueSize() {
- return new QueueSize(swappedCount, swappedBytes);
- }
-
- @Override
- public String toString() {
- return "FlowFile Queue Size[ ActiveQueue=[" + activeQueueCount + ", " + activeQueueBytes +
- " Bytes], Swap Queue=[" + swappedCount + ", " + swappedBytes +
- " Bytes], Swap Files=[" + swapFiles + "], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
- }
- }
-
-
- private static class MaxQueueSize {
- private final String maxSize;
- private final long maxBytes;
- private final long maxCount;
-
- public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) {
- this.maxSize = maxSize;
- this.maxBytes = maxBytes;
- this.maxCount = maxCount;
- }
-
- public String getMaxSize() {
- return maxSize;
- }
-
- public long getMaxBytes() {
- return maxBytes;
- }
-
- public long getMaxCount() {
- return maxCount;
- }
-
- @Override
- public String toString() {
- return maxCount + " Objects/" + maxSize;
- }
- }
-
- private static class TimePeriod {
- private final String period;
- private final long millis;
-
- public TimePeriod(final String period, final long millis) {
- this.period = period;
- this.millis = millis;
- }
-
- public String getPeriod() {
- return period;
- }
-
- public long getMillis() {
- return millis;
- }
-
- @Override
- public String toString() {
- return period;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index f2387c2..7a5c45e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -202,6 +202,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final InetSocketAddress nodeApiAddress = nifiProperties.getNodeApiAddress();
final InetSocketAddress nodeSocketAddress = nifiProperties.getClusterNodeProtocolAddress();
+ final InetSocketAddress loadBalanceAddress = nifiProperties.getClusterLoadBalanceAddress();
String nodeUuid = null;
final StateManager stateManager = controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG);
@@ -217,6 +218,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
this.nodeId = new NodeIdentifier(nodeUuid,
nodeApiAddress.getHostName(), nodeApiAddress.getPort(),
nodeSocketAddress.getHostName(), nodeSocketAddress.getPort(),
+ loadBalanceAddress.getHostName(), loadBalanceAddress.getPort(),
nifiProperties.getRemoteInputHost(), nifiProperties.getRemoteInputPort(),
nifiProperties.getRemoteInputHttpPort(), nifiProperties.isSiteToSiteSecure());
@@ -388,7 +390,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
@Override
- public ProtocolMessage handle(final ProtocolMessage request) throws ProtocolException {
+ public ProtocolMessage handle(final ProtocolMessage request, final Set<String> nodeIdentities) throws ProtocolException {
final long startNanos = System.nanoTime();
try {
switch (request.getType()) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 15538b3..d47e198 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -16,34 +16,6 @@
*/
package org.apache.nifi.controller;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.zip.GZIPInputStream;
-
-import javax.xml.XMLConstants;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Authorizer;
@@ -58,6 +30,8 @@ import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Size;
@@ -127,6 +101,33 @@ import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+
/**
*/
public class StandardFlowSynchronizer implements FlowSynchronizer {
@@ -837,9 +838,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final ProcessorDTO dto = FlowFromDOMFactory.getProcessor(processorElement, encryptor);
final ProcessorNode procNode = processGroup.getProcessor(dto.getId());
+ final ScheduledState procState = getScheduledState(procNode, controller);
updateNonFingerprintedProcessorSettings(procNode, dto);
- if (!procNode.getScheduledState().name().equals(dto.getState())) {
+ if (!procState.name().equals(dto.getState())) {
try {
switch (ScheduledState.valueOf(dto.getState())) {
case DISABLED:
@@ -855,9 +857,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
controller.startProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier(), false);
break;
case STOPPED:
- if (procNode.getScheduledState() == ScheduledState.DISABLED) {
+ if (procState == ScheduledState.DISABLED) {
procNode.getProcessGroup().enableProcessor(procNode);
- } else if (procNode.getScheduledState() == ScheduledState.RUNNING) {
+ } else if (procState == ScheduledState.RUNNING) {
controller.stopProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier());
}
break;
@@ -882,7 +884,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final PortDTO dto = FlowFromDOMFactory.getPort(portElement);
final Port port = processGroup.getInputPort(dto.getId());
- if (!port.getScheduledState().name().equals(dto.getState())) {
+ final ScheduledState portState = getScheduledState(port, controller);
+
+ if (!portState.name().equals(dto.getState())) {
switch (ScheduledState.valueOf(dto.getState())) {
case DISABLED:
// switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing),
@@ -896,9 +900,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
controller.startConnectable(port);
break;
case STOPPED:
- if (port.getScheduledState() == ScheduledState.DISABLED) {
+ if (portState == ScheduledState.DISABLED) {
port.getProcessGroup().enableInputPort(port);
- } else if (port.getScheduledState() == ScheduledState.RUNNING) {
+ } else if (portState == ScheduledState.RUNNING) {
controller.stopConnectable(port);
}
break;
@@ -911,7 +915,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final PortDTO dto = FlowFromDOMFactory.getPort(portElement);
final Port port = processGroup.getOutputPort(dto.getId());
- if (!port.getScheduledState().name().equals(dto.getState())) {
+ final ScheduledState portState = getScheduledState(port, controller);
+
+ if (!portState.name().equals(dto.getState())) {
switch (ScheduledState.valueOf(dto.getState())) {
case DISABLED:
// switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing),
@@ -925,9 +931,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
controller.startConnectable(port);
break;
case STOPPED:
- if (port.getScheduledState() == ScheduledState.DISABLED) {
+ if (portState == ScheduledState.DISABLED) {
port.getProcessGroup().enableOutputPort(port);
- } else if (port.getScheduledState() == ScheduledState.RUNNING) {
+ } else if (portState == ScheduledState.RUNNING) {
controller.stopConnectable(port);
}
break;
@@ -951,12 +957,14 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
continue;
}
+ final ScheduledState portState = getScheduledState(inputPort, controller);
+
if (portDescriptor.isTransmitting()) {
- if (inputPort.getScheduledState() != ScheduledState.RUNNING && inputPort.getScheduledState() != ScheduledState.STARTING) {
- rpg.startTransmitting(inputPort);
+ if (portState != ScheduledState.RUNNING && portState != ScheduledState.STARTING) {
+ controller.startTransmitting(inputPort);
}
- } else if (inputPort.getScheduledState() != ScheduledState.STOPPED && inputPort.getScheduledState() != ScheduledState.STOPPING) {
- rpg.stopTransmitting(inputPort);
+ } else if (portState != ScheduledState.STOPPED && portState != ScheduledState.STOPPING) {
+ controller.stopTransmitting(inputPort);
}
}
@@ -970,12 +978,14 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
continue;
}
+ final ScheduledState portState = getScheduledState(outputPort, controller);
+
if (portDescriptor.isTransmitting()) {
- if (outputPort.getScheduledState() != ScheduledState.RUNNING && outputPort.getScheduledState() != ScheduledState.STARTING) {
- rpg.startTransmitting(outputPort);
+ if (portState != ScheduledState.RUNNING && portState != ScheduledState.STARTING) {
+ controller.startTransmitting(outputPort);
}
- } else if (outputPort.getScheduledState() != ScheduledState.STOPPED && outputPort.getScheduledState() != ScheduledState.STOPPING) {
- rpg.stopTransmitting(outputPort);
+ } else if (portState != ScheduledState.STOPPED && portState != ScheduledState.STOPPING) {
+ controller.stopTransmitting(outputPort);
}
}
}
@@ -1073,6 +1083,17 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
return processGroup;
}
+ private <T extends Connectable & Triggerable> ScheduledState getScheduledState(final T component, final FlowController flowController) {
+ final ScheduledState componentState = component.getScheduledState();
+ if (componentState == ScheduledState.STOPPED) {
+ if (flowController.isStartAfterInitialization(component)) {
+ return ScheduledState.RUNNING;
+ }
+ }
+
+ return componentState;
+ }
+
private Position toPosition(final PositionDTO dto) {
return new Position(dto.getX(), dto.getY());
}
@@ -1499,6 +1520,14 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
connection.getFlowFileQueue().setFlowFileExpiration(dto.getFlowFileExpiration());
}
+ if (dto.getLoadBalanceStrategy() != null) {
+ connection.getFlowFileQueue().setLoadBalanceStrategy(LoadBalanceStrategy.valueOf(dto.getLoadBalanceStrategy()), dto.getLoadBalancePartitionAttribute());
+ }
+
+ if (dto.getLoadBalanceCompression() != null) {
+ connection.getFlowFileQueue().setLoadBalanceCompression(LoadBalanceCompression.valueOf(dto.getLoadBalanceCompression()));
+ }
+
processGroup.addConnection(connection);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index c2b98e6..2cee3d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1426,9 +1426,14 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public synchronized int getTerminatedThreadCount() {
- return (int) activeThreads.values().stream()
- .filter(ActiveTask::isTerminated)
- .count();
+ int count = 0;
+ for (final ActiveTask task : activeThreads.values()) {
+ if (task.isTerminated()) {
+ count++;
+ }
+ }
+
+ return count;
}