You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:31 UTC
[08/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
new file mode 100644
index 0000000..3b880bb
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -0,0 +1,1093 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
+import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.concurrency.TimedLock;
+import org.apache.nifi.util.timebuffer.LongEntityAccess;
+import org.apache.nifi.util.timebuffer.TimedBuffer;
+import org.apache.nifi.util.timebuffer.TimestampedLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A FlowFileQueue is used to queue FlowFile objects that are awaiting further
+ * processing. Must be thread safe.
+ *
+ * @author none
+ */
+public final class StandardFlowFileQueue implements FlowFileQueue {
+
+ public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000;
+ public static final int SWAP_RECORD_POLL_SIZE = 10000;
+
+ // When we have very high contention on a FlowFile Queue, the writeLock quickly becomes the bottleneck. In order to avoid this,
+ // we keep track of how often we are obtaining the write lock. If we exceed some threshold, we start performing a Pre-fetch so that
+ // we can then poll many times without having to obtain the lock.
+ // If lock obtained an average of more than PREFETCH_POLL_THRESHOLD times per second in order to poll from queue for last 5 seconds, do a pre-fetch.
+ public static final int PREFETCH_POLL_THRESHOLD = 1000;
+ public static final int PRIORITIZED_PREFETCH_SIZE = 10;
+ public static final int UNPRIORITIZED_PREFETCH_SIZE = 1000;
+ private volatile int prefetchSize = UNPRIORITIZED_PREFETCH_SIZE; // when we pre-fetch, how many should we pre-fetch?
+
+ private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
+
+ private PriorityQueue<FlowFileRecord> activeQueue = null;
+ private long activeQueueContentSize = 0L;
+ private ArrayList<FlowFileRecord> swapQueue = null;
+
+ private int swappedRecordCount = 0;
+ private long swappedContentSize = 0L;
+ private String maximumQueueDataSize;
+ private long maximumQueueByteCount;
+ private boolean swapMode = false;
+ private long maximumQueueObjectCount;
+
+ private final AtomicLong flowFileExpirationMillis;
+ private final Connection connection;
+ private final AtomicReference<String> flowFileExpirationPeriod;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+ private final List<FlowFilePrioritizer> priorities;
+ private final int swapThreshold;
+ private final TimedLock readLock;
+ private final TimedLock writeLock;
+ private final String identifier;
+
+ private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
+ private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0);
+ private final AtomicReference<QueueSize> unacknowledgedSizeRef = new AtomicReference<>(new QueueSize(0, 0L));
+
+ // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
+ private final ProcessScheduler scheduler;
+
+ public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final int swapThreshold) {
+ activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
+ priorities = new ArrayList<>();
+ maximumQueueObjectCount = 0L;
+ maximumQueueDataSize = "0 MB";
+ maximumQueueByteCount = 0L;
+ flowFileExpirationMillis = new AtomicLong(0);
+ flowFileExpirationPeriod = new AtomicReference<>("0 mins");
+ swapQueue = new ArrayList<>();
+
+ this.identifier = identifier;
+ this.swapThreshold = swapThreshold;
+ this.scheduler = scheduler;
+ this.connection = connection;
+
+ readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100);
+ writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public List<FlowFilePrioritizer> getPriorities() {
+ return Collections.unmodifiableList(priorities);
+ }
+
+ @Override
+ public int getSwapThreshold() {
+ return swapThreshold;
+ }
+
+ @Override
+ public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
+ writeLock.lock();
+ try {
+ final PriorityQueue<FlowFileRecord> newQueue = new PriorityQueue<>(Math.max(20, activeQueue.size()), new Prioritizer(newPriorities));
+ newQueue.addAll(activeQueue);
+ activeQueue = newQueue;
+ priorities.clear();
+ priorities.addAll(newPriorities);
+
+ if (newPriorities.isEmpty()) {
+ prefetchSize = UNPRIORITIZED_PREFETCH_SIZE;
+ } else {
+ prefetchSize = PRIORITIZED_PREFETCH_SIZE;
+ }
+ } finally {
+ writeLock.unlock("setPriorities");
+ }
+ }
+
+ @Override
+ public void setBackPressureObjectThreshold(final long maxQueueSize) {
+ writeLock.lock();
+ try {
+ maximumQueueObjectCount = maxQueueSize;
+ this.queueFullRef.set(determineIfFull());
+ } finally {
+ writeLock.unlock("setBackPressureObjectThreshold");
+ }
+ }
+
+ @Override
+ public long getBackPressureObjectThreshold() {
+ readLock.lock();
+ try {
+ return maximumQueueObjectCount;
+ } finally {
+ readLock.unlock("getBackPressureObjectThreshold");
+ }
+ }
+
+ @Override
+ public void setBackPressureDataSizeThreshold(final String maxDataSize) {
+ writeLock.lock();
+ try {
+ maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
+ maximumQueueDataSize = maxDataSize;
+ this.queueFullRef.set(determineIfFull());
+ } finally {
+ writeLock.unlock("setBackPressureDataSizeThreshold");
+ }
+ }
+
+ @Override
+ public String getBackPressureDataSizeThreshold() {
+ readLock.lock();
+ try {
+ return maximumQueueDataSize;
+ } finally {
+ readLock.unlock("getBackPressureDataSizeThreshold");
+ }
+ }
+
+ @Override
+ public QueueSize size() {
+ readLock.lock();
+ try {
+ return getQueueSize();
+ } finally {
+ readLock.unlock("getSize");
+ }
+ }
+
+ /**
+ * MUST be called with lock held
+ *
+ * @return
+ */
+ private QueueSize getQueueSize() {
+ final QueueSize unacknowledged = unacknowledgedSizeRef.get();
+ final PreFetch preFetch = preFetchRef.get();
+
+ final int preFetchCount;
+ final long preFetchSize;
+ if (preFetch == null) {
+ preFetchCount = 0;
+ preFetchSize = 0L;
+ } else {
+ final QueueSize preFetchQueueSize = preFetch.size();
+ preFetchCount = preFetchQueueSize.getObjectCount();
+ preFetchSize = preFetchQueueSize.getByteCount();
+ }
+
+ return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount,
+ activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize);
+ }
+
+ @Override
+ public long contentSize() {
+ readLock.lock();
+ try {
+ final PreFetch prefetch = preFetchRef.get();
+ if (prefetch == null) {
+ return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount();
+ } else {
+ return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount() + prefetch.size().getByteCount();
+ }
+ } finally {
+ readLock.unlock("getContentSize");
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ readLock.lock();
+ try {
+ final PreFetch prefetch = preFetchRef.get();
+ if (prefetch == null) {
+ return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0;
+ } else {
+ return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0 && prefetch.size().getObjectCount() == 0;
+ }
+ } finally {
+ readLock.unlock("isEmpty");
+ }
+ }
+
+ @Override
+ public boolean isActiveQueueEmpty() {
+ final int activeQueueSize = activeQueueSizeRef.get();
+ if (activeQueueSize == 0) {
+ final PreFetch preFetch = preFetchRef.get();
+ if (preFetch == null) {
+ return true;
+ }
+
+ final QueueSize queueSize = preFetch.size();
+ return queueSize.getObjectCount() == 0;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public QueueSize getActiveQueueSize() {
+ readLock.lock();
+ try {
+ final PreFetch preFetch = preFetchRef.get();
+ if (preFetch == null) {
+ return new QueueSize(activeQueue.size(), activeQueueContentSize);
+ } else {
+ final QueueSize preFetchSize = preFetch.size();
+ return new QueueSize(activeQueue.size() + preFetchSize.getObjectCount(), activeQueueContentSize + preFetchSize.getByteCount());
+ }
+ } finally {
+ readLock.unlock("getActiveQueueSize");
+ }
+ }
+
+ @Override
+ public void acknowledge(final FlowFileRecord flowFile) {
+ if (queueFullRef.get()) {
+ writeLock.lock();
+ try {
+ updateUnacknowledgedSize(-1, -flowFile.getSize());
+ queueFullRef.set(determineIfFull());
+ } finally {
+ writeLock.unlock("acknowledge(FlowFileRecord)");
+ }
+ } else {
+ updateUnacknowledgedSize(-1, -flowFile.getSize());
+ }
+
+ if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+ // queue was full but no longer is. Notify that the source may now be available to run,
+ // because of back pressure caused by this queue.
+ scheduler.registerEvent(connection.getSource());
+ }
+ }
+
+ @Override
+ public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
+ long totalSize = 0L;
+ for (final FlowFileRecord flowFile : flowFiles) {
+ totalSize += flowFile.getSize();
+ }
+
+ if (queueFullRef.get()) {
+ writeLock.lock();
+ try {
+ updateUnacknowledgedSize(-flowFiles.size(), -totalSize);
+ queueFullRef.set(determineIfFull());
+ } finally {
+ writeLock.unlock("acknowledge(FlowFileRecord)");
+ }
+ } else {
+ updateUnacknowledgedSize(-flowFiles.size(), -totalSize);
+ }
+
+ if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+ // it's possible that queue was full but no longer is. Notify that the source may now be available to run,
+ // because of back pressure caused by this queue.
+ scheduler.registerEvent(connection.getSource());
+ }
+ }
+
+ @Override
+ public boolean isFull() {
+ return queueFullRef.get();
+ }
+
+ /**
+ * MUST be called with either the read or write lock held
+ *
+ * @return
+ */
+ private boolean determineIfFull() {
+ final long maxSize = maximumQueueObjectCount;
+ final long maxBytes = maximumQueueByteCount;
+ if (maxSize <= 0 && maxBytes <= 0) {
+ return false;
+ }
+
+ final QueueSize queueSize = getQueueSize();
+ if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) {
+ return true;
+ }
+
+ if (maxBytes > 0 && (queueSize.getByteCount() >= maxBytes)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void put(final FlowFileRecord file) {
+ writeLock.lock();
+ try {
+ if (swapMode || activeQueue.size() >= swapThreshold) {
+ swapQueue.add(file);
+ swappedContentSize += file.getSize();
+ swappedRecordCount++;
+ swapMode = true;
+ } else {
+ activeQueueContentSize += file.getSize();
+ activeQueue.add(file);
+ }
+
+ queueFullRef.set(determineIfFull());
+ } finally {
+ activeQueueSizeRef.set(activeQueue.size());
+ writeLock.unlock("put(FlowFileRecord)");
+ }
+
+ if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+ scheduler.registerEvent(connection.getDestination());
+ }
+ }
+
+ @Override
+ public void putAll(final Collection<FlowFileRecord> files) {
+ final int numFiles = files.size();
+ long bytes = 0L;
+ for (final FlowFile flowFile : files) {
+ bytes += flowFile.getSize();
+ }
+
+ writeLock.lock();
+ try {
+ if (swapMode || activeQueue.size() >= swapThreshold - numFiles) {
+ swapQueue.addAll(files);
+ swappedContentSize += bytes;
+ swappedRecordCount += numFiles;
+ swapMode = true;
+ } else {
+ activeQueueContentSize += bytes;
+ activeQueue.addAll(files);
+ }
+
+ queueFullRef.set(determineIfFull());
+ } finally {
+ activeQueueSizeRef.set(activeQueue.size());
+ writeLock.unlock("putAll");
+ }
+
+ if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+ scheduler.registerEvent(connection.getDestination());
+ }
+ }
+
+ @Override
+ public List<FlowFileRecord> pollSwappableRecords() {
+ writeLock.lock();
+ try {
+ if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
+ return null;
+ }
+
+ final List<FlowFileRecord> swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size()));
+ final Iterator<FlowFileRecord> itr = swapQueue.iterator();
+ while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) {
+ FlowFileRecord record = itr.next();
+ swapRecords.add(record);
+ itr.remove();
+ }
+
+ swapQueue.trimToSize();
+ return swapRecords;
+ } finally {
+ writeLock.unlock("pollSwappableRecords");
+ }
+ }
+
+ @Override
+ public void putSwappedRecords(final Collection<FlowFileRecord> records) {
+ writeLock.lock();
+ try {
+ try {
+ for (final FlowFileRecord record : records) {
+ swappedContentSize -= record.getSize();
+ swappedRecordCount--;
+ activeQueueContentSize += record.getSize();
+ activeQueue.add(record);
+ }
+
+ if (swappedRecordCount > swapQueue.size()) {
+ // we have more swap files to be swapped in.
+ return;
+ }
+
+ // If a call to #pollSwappableRecords will not produce any, go ahead and roll those FlowFiles back into the mix
+ if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
+ for (final FlowFileRecord record : swapQueue) {
+ activeQueue.add(record);
+ activeQueueContentSize += record.getSize();
+ }
+ swapQueue.clear();
+ swappedContentSize = 0L;
+ swappedRecordCount = 0;
+ swapMode = false;
+ }
+ } finally {
+ activeQueueSizeRef.set(activeQueue.size());
+ }
+ } finally {
+ writeLock.unlock("putSwappedRecords");
+ scheduler.registerEvent(connection.getDestination());
+ }
+ }
+
+ @Override
+ public void incrementSwapCount(final int numRecords, final long contentSize) {
+ writeLock.lock();
+ try {
+ swappedContentSize += contentSize;
+ swappedRecordCount += numRecords;
+ } finally {
+ writeLock.unlock("incrementSwapCount");
+ }
+ }
+
+ @Override
+ public int unswappedSize() {
+ readLock.lock();
+ try {
+ return activeQueue.size() + unacknowledgedSizeRef.get().getObjectCount();
+ } finally {
+ readLock.unlock("unswappedSize");
+ }
+ }
+
+ @Override
+ public int getSwapRecordCount() {
+ readLock.lock();
+ try {
+ return swappedRecordCount;
+ } finally {
+ readLock.unlock("getSwapRecordCount");
+ }
+ }
+
+ @Override
+ public int getSwapQueueSize() {
+ readLock.lock();
+ try {
+ if (logger.isDebugEnabled()) {
+ final long byteToMbDivisor = 1024L * 1024L;
+ final QueueSize unacknowledged = unacknowledgedSizeRef.get();
+
+ logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB",
+ activeQueue.size(), activeQueueContentSize / byteToMbDivisor,
+ swappedRecordCount, swappedContentSize / byteToMbDivisor,
+ unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor);
+ }
+
+ return swapQueue.size();
+ } finally {
+ readLock.unlock("getSwapQueueSize");
+ }
+ }
+
+ private boolean isLaterThan(final Long maxAge) {
+ if (maxAge == null) {
+ return false;
+ }
+ return maxAge < System.currentTimeMillis();
+ }
+
+ private Long getExpirationDate(final FlowFile flowFile, final long expirationMillis) {
+ if (flowFile == null) {
+ return null;
+ }
+ if (expirationMillis <= 0) {
+ return null;
+ } else {
+ final long entryDate = flowFile.getEntryDate();
+ final long expirationDate = entryDate + expirationMillis;
+ return expirationDate;
+ }
+ }
+
+ @Override
+ public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
+ FlowFileRecord flowFile = null;
+
+ // First check if we have any records Pre-Fetched.
+ final long expirationMillis = flowFileExpirationMillis.get();
+ final PreFetch preFetch = preFetchRef.get();
+ if (preFetch != null) {
+ if (preFetch.isExpired()) {
+ requeueExpiredPrefetch(preFetch);
+ } else {
+ while (true) {
+ final FlowFileRecord next = preFetch.nextRecord();
+ if (next == null) {
+ break;
+ }
+
+ if (isLaterThan(getExpirationDate(next, expirationMillis))) {
+ expiredRecords.add(next);
+ continue;
+ }
+
+ updateUnacknowledgedSize(1, next.getSize());
+ return next;
+ }
+
+ preFetchRef.compareAndSet(preFetch, null);
+ }
+ }
+
+ writeLock.lock();
+ try {
+ flowFile = doPoll(expiredRecords, expirationMillis);
+ return flowFile;
+ } finally {
+ activeQueueSizeRef.set(activeQueue.size());
+ writeLock.unlock("poll(Set)");
+
+ if (flowFile != null) {
+ updateUnacknowledgedSize(1, flowFile.getSize());
+ }
+ }
+ }
+
+ private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+ FlowFileRecord flowFile;
+ boolean isExpired;
+
+ migrateSwapToActive();
+ boolean queueFullAtStart = queueFullRef.get();
+
+ do {
+ flowFile = this.activeQueue.poll();
+
+ isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
+ if (isExpired) {
+ expiredRecords.add(flowFile);
+ if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
+ activeQueueContentSize -= flowFile.getSize();
+ break;
+ }
+ } else if (flowFile != null && flowFile.isPenalized()) {
+ this.activeQueue.add(flowFile);
+ flowFile = null;
+ break;
+ }
+
+ if (flowFile != null) {
+ activeQueueContentSize -= flowFile.getSize();
+ }
+ } while (isExpired);
+
+ // if at least 1 FlowFile was expired & the queue was full before we started, then
+ // we need to determine whether or not the queue is full again. If no FlowFile was expired,
+ // then the queue will still be full until the appropriate #acknowledge method is called.
+ if (queueFullAtStart && !expiredRecords.isEmpty()) {
+ queueFullRef.set(determineIfFull());
+ }
+
+ if (incrementPollCount()) {
+ prefetch();
+ }
+ return isExpired ? null : flowFile;
+ }
+
+ @Override
+ public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) {
+ final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, maxResults));
+
+ // First check if we have any records Pre-Fetched.
+ final long expirationMillis = flowFileExpirationMillis.get();
+ final PreFetch preFetch = preFetchRef.get();
+ if (preFetch != null) {
+ if (preFetch.isExpired()) {
+ requeueExpiredPrefetch(preFetch);
+ } else {
+ long totalSize = 0L;
+ for (int i = 0; i < maxResults; i++) {
+ final FlowFileRecord next = preFetch.nextRecord();
+ if (next == null) {
+ break;
+ }
+
+ if (isLaterThan(getExpirationDate(next, expirationMillis))) {
+ expiredRecords.add(next);
+ continue;
+ }
+
+ records.add(next);
+ totalSize += next.getSize();
+ }
+
+ // If anything was prefetched, use what we have.
+ if (!records.isEmpty()) {
+ updateUnacknowledgedSize(records.size(), totalSize);
+ return records;
+ }
+
+ preFetchRef.compareAndSet(preFetch, null);
+ }
+ }
+
+ writeLock.lock();
+ try {
+ doPoll(records, maxResults, expiredRecords);
+ } finally {
+ activeQueueSizeRef.set(activeQueue.size());
+ writeLock.unlock("poll(int, Set)");
+ }
+ return records;
+ }
+
+ private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) {
+ migrateSwapToActive();
+
+ final boolean queueFullAtStart = queueFullRef.get();
+
+ final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords);
+
+ long expiredBytes = 0L;
+ for (final FlowFileRecord record : expiredRecords) {
+ expiredBytes += record.getSize();
+ }
+
+ activeQueueContentSize -= bytesDrained;
+ updateUnacknowledgedSize(records.size(), bytesDrained - expiredBytes);
+
+ // if at least 1 FlowFile was expired & the queue was full before we started, then
+ // we need to determine whether or not the queue is full again. If no FlowFile was expired,
+ // then the queue will still be full until the appropriate #acknowledge method is called.
+ if (queueFullAtStart && !expiredRecords.isEmpty()) {
+ queueFullRef.set(determineIfFull());
+ }
+
+ if (incrementPollCount()) {
+ prefetch();
+ }
+ }
+
+ /**
+ * If there are FlowFiles waiting on the swap queue, move them to the active
+ * queue until we meet our threshold. This prevents us from having to swap
+ * them to disk & then back out.
+ *
+ * This method MUST be called with the writeLock held.
+ */
+ private void migrateSwapToActive() {
+ // Migrate as many FlowFiles as we can from the Swap Queue to the Active Queue, so that we don't
+ // have to swap them out & then swap them back in.
+ // If we don't do this, we could get into a situation where we have potentially thousands of FlowFiles
+ // sitting on the Swap Queue but not getting processed because there aren't enough to be swapped out.
+ // In particular, this can happen if the queue is typically filled with surges.
+ // For example, if the queue has 25,000 FlowFiles come in, it may process 20,000 of them and leave
+ // 5,000 sitting on the Swap Queue. If it then takes an hour for an additional 5,000 FlowFiles to come in,
+ // those FlowFiles sitting on the Swap Queue will sit there for an hour, waiting to be swapped out and
+ // swapped back in again.
+ // Calling this method when records are polled prevents this condition by migrating FlowFiles from the
+ // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
+ // to disk, because we want them to be swapped back in in the same order that they were swapped out.
+
+ // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense
+ // of other checks for 99.999% of the cases.
+ if (swappedRecordCount == 0 && swapQueue.isEmpty()) {
+ return;
+ }
+
+ if (swappedRecordCount > swapQueue.size()) {
+ // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for
+ // an external process to swap FlowFiles back in.
+ return;
+ }
+
+ final Iterator<FlowFileRecord> swapItr = swapQueue.iterator();
+ while (activeQueue.size() < swapThreshold && swapItr.hasNext()) {
+ final FlowFileRecord toMigrate = swapItr.next();
+ activeQueue.add(toMigrate);
+ activeQueueContentSize += toMigrate.getSize();
+ swappedContentSize -= toMigrate.getSize();
+ swappedRecordCount--;
+
+ swapItr.remove();
+ }
+
+ if (swappedRecordCount == 0) {
+ swapMode = false;
+ }
+ }
+
+ @Override
+ public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords) {
+ long drainedSize = 0L;
+ FlowFileRecord pulled = null;
+
+ final long expirationMillis = this.flowFileExpirationMillis.get();
+ while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
+ if (isLaterThan(getExpirationDate(pulled, expirationMillis))) {
+ expiredRecords.add(pulled);
+ if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
+ break;
+ }
+ } else {
+ if (pulled.isPenalized()) {
+ sourceQueue.add(pulled);
+ break;
+ }
+ destination.add(pulled);
+ }
+ drainedSize += pulled.getSize();
+ }
+ return drainedSize;
+ }
+
+ @Override
+ public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
+ writeLock.lock();
+ try {
+ migrateSwapToActive();
+ if (activeQueue.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final long expirationMillis = this.flowFileExpirationMillis.get();
+ final boolean queueFullAtStart = queueFullRef.get();
+
+ final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
+ final List<FlowFileRecord> unselected = new ArrayList<>();
+
+ while (true) {
+ FlowFileRecord flowFile = this.activeQueue.poll();
+ if (flowFile == null) {
+ break;
+ }
+
+ final boolean isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
+ if (isExpired) {
+ expiredRecords.add(flowFile);
+ activeQueueContentSize -= flowFile.getSize();
+
+ if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
+ break;
+ } else {
+ continue;
+ }
+ } else if (flowFile.isPenalized()) {
+ this.activeQueue.add(flowFile);
+ flowFile = null;
+ break; // just stop searching because the rest are all penalized.
+ }
+
+ final FlowFileFilterResult result = filter.filter(flowFile);
+ if (result.isAccept()) {
+ activeQueueContentSize -= flowFile.getSize();
+
+ updateUnacknowledgedSize(1, flowFile.getSize());
+ selectedFlowFiles.add(flowFile);
+ } else {
+ unselected.add(flowFile);
+ }
+
+ if (!result.isContinue()) {
+ break;
+ }
+ }
+
+ this.activeQueue.addAll(unselected);
+
+ // if at least 1 FlowFile was expired & the queue was full before we started, then
+ // we need to determine whether or not the queue is full again. If no FlowFile was expired,
+ // then the queue will still be full until the appropriate #acknowledge method is called.
+ if (queueFullAtStart && !expiredRecords.isEmpty()) {
+ queueFullRef.set(determineIfFull());
+ }
+
+ return selectedFlowFiles;
+ } finally {
+ activeQueueSizeRef.set(activeQueue.size());
+ writeLock.unlock("poll(Filter, Set)");
+ }
+ }
+
+ private static final class Prioritizer implements Comparator<FlowFileRecord>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+
+ private Prioritizer(final List<FlowFilePrioritizer> priorities) {
+ if (null != priorities) {
+ prioritizers.addAll(priorities);
+ }
+ }
+
+ @Override
+ public int compare(final FlowFileRecord f1, final FlowFileRecord f2) {
+ int returnVal = 0;
+ final boolean f1Penalized = f1.isPenalized();
+ final boolean f2Penalized = f2.isPenalized();
+
+ if (f1Penalized && !f2Penalized) {
+ return 1;
+ } else if (!f1Penalized && f2Penalized) {
+ return -1;
+ }
+
+ if (f1Penalized && f2Penalized) {
+ if (f1.getPenaltyExpirationMillis() < f2.getPenaltyExpirationMillis()) {
+ return -1;
+ } else if (f1.getPenaltyExpirationMillis() > f2.getPenaltyExpirationMillis()) {
+ return 1;
+ }
+ }
+
+ if (!prioritizers.isEmpty()) {
+ for (final FlowFilePrioritizer prioritizer : prioritizers) {
+ returnVal = prioritizer.compare(f1, f2);
+ if (returnVal != 0) {
+ return returnVal;
+ }
+ }
+ }
+
+ final ContentClaim claim1 = f1.getContentClaim();
+ final ContentClaim claim2 = f2.getContentClaim();
+
+ // put the one without a claim first
+ if (claim1 == null && claim2 != null) {
+ return -1;
+ } else if (claim1 != null && claim2 == null) {
+ return 1;
+ } else if (claim1 != null && claim2 != null) {
+ final int claimComparison = claim1.compareTo(claim2);
+ if (claimComparison != 0) {
+ return claimComparison;
+ }
+
+ final int claimOffsetComparison = Long.compare(f1.getContentClaimOffset(), f2.getContentClaimOffset());
+ if (claimOffsetComparison != 0) {
+ return claimOffsetComparison;
+ }
+ }
+
+ return Long.compare(f1.getId(), f2.getId());
+ }
+ }
+
+ @Override
+ public String getFlowFileExpiration() {
+ return flowFileExpirationPeriod.get();
+ }
+
+ @Override
+ public int getFlowFileExpiration(final TimeUnit timeUnit) {
+ return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setFlowFileExpiration(final String flowExpirationPeriod) {
+ final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
+ if (millis < 0) {
+ throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
+ }
+ this.flowFileExpirationPeriod.set(flowExpirationPeriod);
+ this.flowFileExpirationMillis.set(millis);
+ }
+
+ @Override
+ public String toString() {
+ return "FlowFileQueue[id=" + identifier + "]";
+ }
+
+ /**
+ * Lock the queue so that other threads are unable to interact with the
+ * queue
+ */
+ public void lock() {
+ writeLock.lock();
+ }
+
+ /**
+ * Unlock the queue
+ */
+ public void unlock() {
+ writeLock.unlock("external unlock");
+ }
+
+ private void updateUnacknowledgedSize(final int addToCount, final long addToSize) {
+ boolean updated = false;
+
+ do {
+ QueueSize queueSize = unacknowledgedSizeRef.get();
+ final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize);
+ updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize);
+ } while (!updated);
+ }
+
+ private void requeueExpiredPrefetch(final PreFetch prefetch) {
+ if (prefetch == null) {
+ return;
+ }
+
+ writeLock.lock();
+ try {
+ final long contentSizeRequeued = prefetch.requeue(activeQueue);
+ this.activeQueueContentSize += contentSizeRequeued;
+ this.preFetchRef.compareAndSet(prefetch, null);
+ } finally {
+ writeLock.unlock("requeueExpiredPrefetch");
+ }
+ }
+
+ /**
+ * MUST be called with write lock held.
+ */
+ private final AtomicReference<PreFetch> preFetchRef = new AtomicReference<>();
+
+ private void prefetch() {
+ if (activeQueue.isEmpty()) {
+ return;
+ }
+
+ final int numToFetch = Math.min(prefetchSize, activeQueue.size());
+
+ final PreFetch curPreFetch = preFetchRef.get();
+ if (curPreFetch != null && curPreFetch.size().getObjectCount() > 0) {
+ return;
+ }
+
+ final List<FlowFileRecord> buffer = new ArrayList<>(numToFetch);
+ long contentSize = 0L;
+ for (int i = 0; i < numToFetch; i++) {
+ final FlowFileRecord record = activeQueue.poll();
+ if (record == null || record.isPenalized()) {
+ // not enough unpenalized records to pull. Put all records back and return
+ activeQueue.addAll(buffer);
+ return;
+ } else {
+ buffer.add(record);
+ contentSize += record.getSize();
+ }
+ }
+
+ activeQueueContentSize -= contentSize;
+ preFetchRef.set(new PreFetch(buffer));
+ }
+
+ private final TimedBuffer<TimestampedLong> pollCounts = new TimedBuffer<>(TimeUnit.SECONDS, 5, new LongEntityAccess());
+
+ private boolean incrementPollCount() {
+ pollCounts.add(new TimestampedLong(1L));
+ final long totalCount = pollCounts.getAggregateValue(System.currentTimeMillis() - 5000L).getValue();
+ return totalCount > PREFETCH_POLL_THRESHOLD * 5;
+ }
+
+ private static class PreFetch {
+
+ private final List<FlowFileRecord> records;
+ private final AtomicInteger pointer = new AtomicInteger(0);
+ private final long expirationTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1L);
+ private final AtomicLong contentSize = new AtomicLong(0L);
+
+ public PreFetch(final List<FlowFileRecord> records) {
+ this.records = records;
+
+ long totalSize = 0L;
+ for (final FlowFileRecord record : records) {
+ totalSize += record.getSize();
+ }
+ contentSize.set(totalSize);
+ }
+
+ public FlowFileRecord nextRecord() {
+ final int nextValue = pointer.getAndIncrement();
+ if (nextValue >= records.size()) {
+ return null;
+ }
+
+ final FlowFileRecord flowFile = records.get(nextValue);
+ contentSize.addAndGet(-flowFile.getSize());
+ return flowFile;
+ }
+
+ public QueueSize size() {
+ final int pointerIndex = pointer.get();
+ final int count = records.size() - pointerIndex;
+ if (count < 0) {
+ return new QueueSize(0, 0L);
+ }
+
+ final long bytes = contentSize.get();
+ return new QueueSize(count, bytes);
+ }
+
+ public boolean isExpired() {
+ return System.nanoTime() > expirationTime;
+ }
+
+ private long requeue(final Queue<FlowFileRecord> queue) {
+ // get the current pointer and prevent any other thread from accessing the rest of the elements
+ final int curPointer = pointer.getAndAdd(records.size());
+ if (curPointer < records.size() - 1) {
+ final List<FlowFileRecord> subList = records.subList(curPointer, records.size());
+ long contentSize = 0L;
+ for (final FlowFileRecord record : subList) {
+ contentSize += record.getSize();
+ }
+
+ queue.addAll(subList);
+
+ return contentSize;
+ }
+ return 0L;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
new file mode 100644
index 0000000..52a4e40
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FormatUtils;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class StandardFunnel implements Funnel {
+
+ public static final long MINIMUM_PENALIZATION_MILLIS = 0L;
+ public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
+ public static final long MINIMUM_YIELD_MILLIS = 0L;
+ public static final long DEFAULT_YIELD_PERIOD = 1000L;
+ public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS;
+
+ private final String identifier;
+ private final Set<Connection> outgoingConnections;
+ private final List<Connection> incomingConnections;
+ private final List<Relationship> relationships;
+
+ private final AtomicReference<ProcessGroup> processGroupRef;
+ private final AtomicReference<Position> position;
+ private final AtomicReference<String> penalizationPeriod;
+ private final AtomicReference<String> yieldPeriod;
+ private final AtomicReference<String> schedulingPeriod;
+ private final AtomicReference<String> name;
+ private final AtomicLong schedulingNanos;
+ private final AtomicBoolean lossTolerant;
+ private final AtomicReference<ScheduledState> scheduledState;
+ private final AtomicLong yieldExpiration;
+
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Lock readLock = rwLock.readLock();
+ private final Lock writeLock = rwLock.writeLock();
+
+ public StandardFunnel(final String identifier, final ProcessGroup processGroup, final ProcessScheduler scheduler) {
+ this.identifier = identifier;
+ this.processGroupRef = new AtomicReference<>(processGroup);
+
+ outgoingConnections = new HashSet<>();
+ incomingConnections = new ArrayList<>();
+
+ final List<Relationship> relationships = new ArrayList<>();
+ relationships.add(Relationship.ANONYMOUS);
+ this.relationships = Collections.unmodifiableList(relationships);
+
+ lossTolerant = new AtomicBoolean(false);
+ position = new AtomicReference<>(new Position(0D, 0D));
+ scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
+ penalizationPeriod = new AtomicReference<>("30 sec");
+ yieldPeriod = new AtomicReference<>("1 sec");
+ yieldExpiration = new AtomicLong(0L);
+ schedulingPeriod = new AtomicReference<>("0 millis");
+ schedulingNanos = new AtomicLong(30000);
+ name = new AtomicReference<>("Funnel");
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public Collection<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public Relationship getRelationship(final String relationshipName) {
+ return (Relationship.ANONYMOUS.getName().equals(relationshipName)) ? Relationship.ANONYMOUS : null;
+ }
+
+ @Override
+ public void addConnection(final Connection connection) throws IllegalArgumentException {
+ writeLock.lock();
+ try {
+ if (!requireNonNull(connection).getSource().equals(this) && !connection.getDestination().equals(this)) {
+ throw new IllegalArgumentException("Cannot add a connection to a Funnel for which the Funnel is neither the Source nor the Destination");
+ }
+ if (connection.getSource().equals(this) && connection.getDestination().equals(this)) {
+ throw new IllegalArgumentException("Cannot add a connection from a Funnel back to itself");
+ }
+
+ if (connection.getDestination().equals(this)) {
+ // don't add the connection twice. This may occur if we have a self-loop because we will be told
+ // to add the connection once because we are the source and again because we are the destination.
+ if (!incomingConnections.contains(connection)) {
+ incomingConnections.add(connection);
+ }
+ }
+
+ if (connection.getSource().equals(this)) {
+ // don't add the connection twice. This may occur if we have a self-loop because we will be told
+ // to add the connection once because we are the source and again because we are the destination.
+ if (!outgoingConnections.contains(connection)) {
+ for (final Relationship relationship : connection.getRelationships()) {
+ if (!relationship.equals(Relationship.ANONYMOUS)) {
+ throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Funnels");
+ }
+ }
+
+ outgoingConnections.add(connection);
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean hasIncomingConnection() {
+ readLock.lock();
+ try {
+ return !incomingConnections.isEmpty();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void updateConnection(final Connection connection) throws IllegalStateException {
+ if (requireNonNull(connection).getSource().equals(this)) {
+ writeLock.lock();
+ try {
+ if (!outgoingConnections.remove(connection)) {
+ throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
+ }
+ outgoingConnections.add(connection);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ if (connection.getDestination().equals(this)) {
+ writeLock.lock();
+ try {
+ if (!incomingConnections.remove(connection)) {
+ throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
+ }
+ incomingConnections.add(connection);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ }
+
+ @Override
+ public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException {
+ writeLock.lock();
+ try {
+ if (!requireNonNull(connection).getSource().equals(this)) {
+ final boolean existed = incomingConnections.remove(connection);
+ if (!existed) {
+ throw new IllegalStateException("The given connection is not currently registered for this ProcessorNode");
+ }
+ return;
+ }
+
+ final boolean removed = outgoingConnections.remove(connection);
+ if (!removed) {
+ throw new IllegalStateException(connection + " is not registered with " + this);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<Connection> getConnections() {
+ readLock.lock();
+ try {
+ return Collections.unmodifiableSet(outgoingConnections);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Set<Connection> getConnections(final Relationship relationship) {
+ readLock.lock();
+ try {
+ if (relationship.equals(Relationship.ANONYMOUS)) {
+ return Collections.unmodifiableSet(outgoingConnections);
+ }
+
+ throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Funnels");
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public List<Connection> getIncomingConnections() {
+ readLock.lock();
+ try {
+ return new ArrayList<>(incomingConnections);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Position getPosition() {
+ return position.get();
+ }
+
+ @Override
+ public void setPosition(Position position) {
+ this.position.set(position);
+ }
+
+ @Override
+ public String getName() {
+ return name.get();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}
+ *
+ * @param name
+ */
+ @Override
+ public void setName(final String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getComments() {
+ return "";
+ }
+
+ @Override
+ public void setComments(final String comments) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProcessGroup getProcessGroup() {
+ return processGroupRef.get();
+ }
+
+ @Override
+ public void setProcessGroup(final ProcessGroup group) {
+ processGroupRef.set(group);
+ }
+
+ @Override
+ public boolean isAutoTerminated(Relationship relationship) {
+ return false;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return isRunning(this);
+ }
+
+ private boolean isRunning(final Connectable source) {
+ return getScheduledState() == ScheduledState.RUNNING;
+ }
+
+ @Override
+ public boolean isTriggerWhenEmpty() {
+ return false;
+ }
+
+ @Override
+ public ScheduledState getScheduledState() {
+ return scheduledState.get();
+ }
+
+ @Override
+ public boolean isLossTolerant() {
+ return lossTolerant.get();
+ }
+
+ @Override
+ public void setLossTolerant(final boolean lossTolerant) {
+ this.lossTolerant.set(lossTolerant);
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", getIdentifier()).toString();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
+ final ProcessSession session = sessionFactory.createSession();
+
+ try {
+ onTrigger(context, session);
+ session.commit();
+ } catch (final ProcessException e) {
+ session.rollback();
+ throw e;
+ } catch (final Throwable t) {
+ session.rollback();
+ throw new RuntimeException(t);
+ }
+ }
+
+ private void onTrigger(final ProcessContext context, final ProcessSession session) {
+ readLock.lock();
+ try {
+ Set<Relationship> available = session.getAvailableRelationships();
+ int transferred = 0;
+ while (!available.isEmpty()) {
+ final List<FlowFile> flowFiles = session.get(10);
+ if (flowFiles.isEmpty()) {
+ break;
+ }
+
+ transferred += flowFiles.size();
+ session.transfer(flowFiles, Relationship.ANONYMOUS);
+ session.commit();
+ available = session.getAvailableRelationships();
+ }
+
+ if (transferred == 0) {
+ context.yield();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Has no effect
+ */
+ @Override
+ public void setMaxConcurrentTasks(int taskCount) {
+ }
+
+ @Override
+ public int getMaxConcurrentTasks() {
+ return 1;
+ }
+
+ @Override
+ public void setScheduledState(final ScheduledState scheduledState) {
+ this.scheduledState.set(scheduledState);
+ }
+
+ @Override
+ public ConnectableType getConnectableType() {
+ return ConnectableType.FUNNEL;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Collection<ValidationResult> getValidationErrors() {
+ return Collections.EMPTY_LIST;
+ }
+
+ /**
+ * Updates the amount of time that this processor should avoid being
+ * scheduled when the processor calls
+ * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()}
+ *
+ * @param yieldPeriod
+ */
+ @Override
+ public void setYieldPeriod(final String yieldPeriod) {
+ final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
+ if (yieldMillis < 0) {
+ throw new IllegalArgumentException("Yield duration must be positive");
+ }
+ this.yieldPeriod.set(yieldPeriod);
+ }
+
+ /**
+ * @param schedulingPeriod
+ */
+ @Override
+ public void setScheduldingPeriod(final String schedulingPeriod) {
+ final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS);
+ if (schedulingNanos < 0) {
+ throw new IllegalArgumentException("Scheduling Period must be positive");
+ }
+
+ this.schedulingPeriod.set(schedulingPeriod);
+ this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
+ }
+
+ @Override
+ public long getPenalizationPeriod(final TimeUnit timeUnit) {
+ return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
+ }
+
+ @Override
+ public String getPenalizationPeriod() {
+ return penalizationPeriod.get();
+ }
+
+ /**
+ * Causes the processor not to be scheduled for some period of time. This
+ * duration can be obtained and set via the
+ * {@link #getYieldPeriod(TimeUnit)} and
+ * {@link #setYieldPeriod(long, TimeUnit)} methods.
+ */
+ @Override
+ public void yield() {
+ final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
+ yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis));
+ }
+
+ @Override
+ public long getYieldExpiration() {
+ return yieldExpiration.get();
+ }
+
+ @Override
+ public String getSchedulingPeriod() {
+ return schedulingPeriod.get();
+ }
+
+ @Override
+ public void setPenalizationPeriod(final String penalizationPeriod) {
+ this.penalizationPeriod.set(penalizationPeriod);
+ }
+
+ @Override
+ public String getYieldPeriod() {
+ return yieldPeriod.get();
+ }
+
+ @Override
+ public long getYieldPeriod(final TimeUnit timeUnit) {
+ return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
+ }
+
+ @Override
+ public long getSchedulingPeriod(final TimeUnit timeUnit) {
+ return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public boolean isSideEffectFree() {
+ return true;
+ }
+
+ @Override
+ public void verifyCanDelete(boolean ignoreConnections) throws IllegalStateException {
+ if (ignoreConnections) {
+ return;
+ }
+
+ readLock.lock();
+ try {
+ for (final Connection connection : outgoingConnections) {
+ connection.verifyCanDelete();
+ }
+
+ for (final Connection connection : incomingConnections) {
+ if (connection.getSource().equals(this)) {
+ connection.verifyCanDelete();
+ } else {
+ throw new IllegalStateException(this + " is the destination of another component");
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void verifyCanDelete() {
+ verifyCanDelete(false);
+ }
+
+ @Override
+ public void verifyCanStart() {
+ }
+
+ @Override
+ public void verifyCanStop() {
+ }
+
+ @Override
+ public void verifyCanUpdate() {
+ }
+
+ @Override
+ public void verifyCanEnable() {
+ }
+
+ @Override
+ public void verifyCanDisable() {
+ }
+
+ @Override
+ public SchedulingStrategy getSchedulingStrategy() {
+ return SchedulingStrategy.EVENT_DRIVEN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
new file mode 100644
index 0000000..df3c251
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+
+public interface ValidationContextFactory {
+
+ ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java
new file mode 100644
index 0000000..2f43600
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.connectable.Connectable;
+
+public interface WorkerQueue {
+
+ EventBasedWorker poll(long timeout, TimeUnit timeUnit);
+
+ void offer(Connectable worker);
+
+ void setClustered(boolean clustered);
+
+ void setPrimary(boolean primary);
+
+ void suspendWork(Connectable worker);
+
+ void resumeWork(Connectable worker);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java
new file mode 100644
index 0000000..368ed1b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.exception;
+
+import java.io.IOException;
+
+public class CommunicationsException extends IOException {
+
+ private static final long serialVersionUID = 142343242323423L;
+
+ public CommunicationsException() {
+ super();
+ }
+
+ public CommunicationsException(final Throwable cause) {
+ super(cause);
+ }
+
+ public CommunicationsException(final String explanation) {
+ super(explanation);
+ }
+
+ public CommunicationsException(final String explanation, final Throwable cause) {
+ super(explanation, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java
new file mode 100644
index 0000000..0ff68b0
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.exception;
+
+public class ControllerServiceAlreadyExistsException extends RuntimeException {
+
+ private static final long serialVersionUID = -544424320587059277L;
+
+ /**
+ * Constructs a default exception
+ * @param id
+ */
+ public ControllerServiceAlreadyExistsException(final String id) {
+ super("A Controller Service already exists with ID " + id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
new file mode 100644
index 0000000..4cdbe54
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.exception;
+
+public class ControllerServiceNotFoundException extends RuntimeException {
+
+ private static final long serialVersionUID = -544424320587059277L;
+
+ /**
+ * Constructs a default exception
+ */
+ public ControllerServiceNotFoundException() {
+ super();
+ }
+
+ /**
+ * @param message
+ */
+ public ControllerServiceNotFoundException(String message) {
+ super(message);
+ }
+
+ /**
+ * @param cause
+ */
+ public ControllerServiceNotFoundException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * @param message
+ * @param cause
+ */
+ public ControllerServiceNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java
new file mode 100644
index 0000000..c4aba44
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.exception;
+
+public class ProcessorInstantiationException extends Exception {
+
+ private static final long serialVersionUID = 189273489L;
+
+ public ProcessorInstantiationException(final String className, final Throwable t) {
+ super(className, t);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java
new file mode 100644
index 0000000..5acca16
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.exception;
+
+public class ProcessorLifeCycleException extends RuntimeException {
+
+ private static final long serialVersionUID = 8392341500511490941L;
+
+ public ProcessorLifeCycleException(final String message, final Throwable t) {
+ super(message, t);
+ }
+
+ public ProcessorLifeCycleException(final Throwable t) {
+ super(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java
new file mode 100644
index 0000000..97c44b5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.label;
+
+import java.util.Map;
+
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.connectable.Size;
+import org.apache.nifi.groups.ProcessGroup;
+
+public interface Label {
+
+ String getIdentifier();
+
+ Position getPosition();
+
+ void setPosition(Position position);
+
+ Map<String, String> getStyle();
+
+ void setStyle(Map<String, String> style);
+
+ Size getSize();
+
+ void setSize(Size size);
+
+ ProcessGroup getProcessGroup();
+
+ void setProcessGroup(ProcessGroup group);
+
+ String getValue();
+
+ void setValue(String value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java
new file mode 100644
index 0000000..ced6ff9
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.reporting;
+
+public class ReportingTaskInstantiationException extends Exception {
+
+ private static final long serialVersionUID = 189234789237L;
+
+ public ReportingTaskInstantiationException(final String className, final Throwable t) {
+ super(className, t);
+ }
+
+ public ReportingTaskInstantiationException(final String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
new file mode 100644
index 0000000..6ce7ba6
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+
+/**
+ *
+ * @author none
+ */
+public class ContentNotFoundException extends RuntimeException {
+
+ private static final long serialVersionUID = 19048239082L;
+ private final transient ContentClaim claim;
+
+ public ContentNotFoundException(final ContentClaim claim) {
+ super("Could not find content for " + claim);
+ this.claim = claim;
+ }
+
+ public ContentNotFoundException(final ContentClaim claim, final Throwable t) {
+ super("Could not find content for " + claim, t);
+ this.claim = claim;
+ }
+
+ public ContentNotFoundException(final ContentClaim claim, final String message) {
+ super("Could not find content for " + claim + ": " + message);
+ this.claim = claim;
+ }
+
+ public ContentClaim getMissingClaim() {
+ return claim;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java
new file mode 100644
index 0000000..de231ed
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import java.util.List;
+
+import org.apache.nifi.controller.Counter;
+
+public interface CounterRepository {
+
+ void adjustCounter(String counterContext, String name, long delta);
+
+ Counter getCounter(String counterContext, String name);
+
+ List<Counter> getCounters();
+
+ List<Counter> getCounters(String counterContext);
+
+ Counter resetCounter(String identifier);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
new file mode 100644
index 0000000..f07a530
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+public interface FlowFileEvent {
+
+ String getComponentIdentifier();
+
+ int getFlowFilesIn();
+
+ int getFlowFilesOut();
+
+ int getFlowFilesRemoved();
+
+ long getContentSizeIn();
+
+ long getContentSizeOut();
+
+ long getContentSizeRemoved();
+
+ long getBytesRead();
+
+ long getBytesWritten();
+
+ long getProcessingNanoseconds();
+
+ long getAverageLineageMillis();
+
+ long getAggregateLineageMillis();
+
+ int getFlowFilesReceived();
+
+ long getBytesReceived();
+
+ int getFlowFilesSent();
+
+ long getBytesSent();
+
+ int getInvocations();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
new file mode 100644
index 0000000..2eb3caf
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.repository;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ *
+ * @author none
+ */
+public interface FlowFileEventRepository extends Closeable {
+
+ /**
+ * Updates the repository to include a new FlowFile processing event
+ *
+ * @param event
+ * @throws java.io.IOException
+ */
+ void updateRepository(FlowFileEvent event) throws IOException;
+
+ /**
+ * Returns a report of processing activity since the given time
+ * @param sinceEpochMillis
+ * @return
+ */
+ RepositoryStatusReport reportTransferEvents(long sinceEpochMillis);
+
+ /**
+ * Causes any flow file events of the given entry age in epoch milliseconds
+ * or older to be purged from the repository
+ *
+ * @param cutoffEpochMilliseconds
+ */
+ void purgeTransferEvents(long cutoffEpochMilliseconds);
+}