You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/21 07:48:19 UTC
[11/51] [partial] incubator-nifi git commit: NIFI-270 Made all
changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
deleted file mode 100644
index 59d2308..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ /dev/null
@@ -1,1096 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.FlowFileFilter;
-import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
-import org.apache.nifi.processor.QueueSize;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.concurrency.TimedLock;
-import org.apache.nifi.util.timebuffer.LongEntityAccess;
-import org.apache.nifi.util.timebuffer.TimedBuffer;
-import org.apache.nifi.util.timebuffer.TimestampedLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A FlowFileQueue is used to queue FlowFile objects that are awaiting further
- * processing. Must be thread safe.
- *
- * @author none
- */
-public final class StandardFlowFileQueue implements FlowFileQueue {
-
- public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000;
- public static final int SWAP_RECORD_POLL_SIZE = 10000;
-
- // When we have very high contention on a FlowFile Queue, the writeLock quickly becomes the bottleneck. In order to avoid this,
- // we keep track of how often we are obtaining the write lock. If we exceed some threshold, we start performing a Pre-fetch so that
- // we can then poll many times without having to obtain the lock.
- // If lock obtained an average of more than PREFETCH_POLL_THRESHOLD times per second in order to poll from queue for last 5 seconds, do a pre-fetch.
- public static final int PREFETCH_POLL_THRESHOLD = 1000;
- public static final int PRIORITIZED_PREFETCH_SIZE = 10;
- public static final int UNPRIORITIZED_PREFETCH_SIZE = 1000;
- private volatile int prefetchSize = UNPRIORITIZED_PREFETCH_SIZE; // when we pre-fetch, how many should we pre-fetch?
-
- private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
-
- private PriorityQueue<FlowFileRecord> activeQueue = null;
- private long activeQueueContentSize = 0L;
- private ArrayList<FlowFileRecord> swapQueue = null;
-
- private int swappedRecordCount = 0;
- private long swappedContentSize = 0L;
- private String maximumQueueDataSize;
- private long maximumQueueByteCount;
- private boolean swapMode = false;
- private long maximumQueueObjectCount;
-
- private final AtomicLong flowFileExpirationMillis;
- private final Connection connection;
- private final AtomicReference<String> flowFileExpirationPeriod;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
- private final List<FlowFilePrioritizer> priorities;
- private final int swapThreshold;
- private final TimedLock readLock;
- private final TimedLock writeLock;
- private final String identifier;
-
- private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
- private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0);
- private final AtomicReference<QueueSize> unacknowledgedSizeRef = new AtomicReference<>(new QueueSize(0, 0L));
-
- // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
- private final ProcessScheduler scheduler;
-
- public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final int swapThreshold) {
- activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
- priorities = new ArrayList<>();
- maximumQueueObjectCount = 0L;
- maximumQueueDataSize = "0 MB";
- maximumQueueByteCount = 0L;
- flowFileExpirationMillis = new AtomicLong(0);
- flowFileExpirationPeriod = new AtomicReference<>("0 mins");
- swapQueue = new ArrayList<>();
-
- this.identifier = identifier;
- this.swapThreshold = swapThreshold;
- this.scheduler = scheduler;
- this.connection = connection;
-
- readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100);
- writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100);
- }
-
- @Override
- public String getIdentifier() {
- return identifier;
- }
-
- @Override
- public List<FlowFilePrioritizer> getPriorities() {
- return Collections.unmodifiableList(priorities);
- }
-
- @Override
- public int getSwapThreshold() {
- return swapThreshold;
- }
-
- @Override
- public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
- writeLock.lock();
- try {
- final PriorityQueue<FlowFileRecord> newQueue = new PriorityQueue<>(Math.max(20, activeQueue.size()), new Prioritizer(newPriorities));
- newQueue.addAll(activeQueue);
- activeQueue = newQueue;
- priorities.clear();
- priorities.addAll(newPriorities);
-
- if (newPriorities.isEmpty()) {
- prefetchSize = UNPRIORITIZED_PREFETCH_SIZE;
- } else {
- prefetchSize = PRIORITIZED_PREFETCH_SIZE;
- }
- } finally {
- writeLock.unlock("setPriorities");
- }
- }
-
- @Override
- public void setBackPressureObjectThreshold(final long maxQueueSize) {
- writeLock.lock();
- try {
- maximumQueueObjectCount = maxQueueSize;
- this.queueFullRef.set(determineIfFull());
- } finally {
- writeLock.unlock("setBackPressureObjectThreshold");
- }
- }
-
- @Override
- public long getBackPressureObjectThreshold() {
- readLock.lock();
- try {
- return maximumQueueObjectCount;
- } finally {
- readLock.unlock("getBackPressureObjectThreshold");
- }
- }
-
- @Override
- public void setBackPressureDataSizeThreshold(final String maxDataSize) {
- writeLock.lock();
- try {
- maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
- maximumQueueDataSize = maxDataSize;
- this.queueFullRef.set(determineIfFull());
- } finally {
- writeLock.unlock("setBackPressureDataSizeThreshold");
- }
- }
-
- @Override
- public String getBackPressureDataSizeThreshold() {
- readLock.lock();
- try {
- return maximumQueueDataSize;
- } finally {
- readLock.unlock("getBackPressureDataSizeThreshold");
- }
- }
-
- @Override
- public QueueSize size() {
- readLock.lock();
- try {
- return getQueueSize();
- } finally {
- readLock.unlock("getSize");
- }
- }
-
- /**
- * MUST be called with lock held
- *
- * @return
- */
- private QueueSize getQueueSize() {
- final QueueSize unacknowledged = unacknowledgedSizeRef.get();
- final PreFetch preFetch = preFetchRef.get();
-
- final int preFetchCount;
- final long preFetchSize;
- if (preFetch == null) {
- preFetchCount = 0;
- preFetchSize = 0L;
- } else {
- final QueueSize preFetchQueueSize = preFetch.size();
- preFetchCount = preFetchQueueSize.getObjectCount();
- preFetchSize = preFetchQueueSize.getByteCount();
- }
-
- return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount,
- activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize);
- }
-
- @Override
- public long contentSize() {
- readLock.lock();
- try {
- final PreFetch prefetch = preFetchRef.get();
- if (prefetch == null) {
- return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount();
- } else {
- return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount() + prefetch.size().getByteCount();
- }
- } finally {
- readLock.unlock("getContentSize");
- }
- }
-
- @Override
- public boolean isEmpty() {
- readLock.lock();
- try {
- final PreFetch prefetch = preFetchRef.get();
- if (prefetch == null) {
- return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0;
- } else {
- return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0 && prefetch.size().getObjectCount() == 0;
- }
- } finally {
- readLock.unlock("isEmpty");
- }
- }
-
- @Override
- public boolean isActiveQueueEmpty() {
- final int activeQueueSize = activeQueueSizeRef.get();
- if (activeQueueSize == 0) {
- final PreFetch preFetch = preFetchRef.get();
- if (preFetch == null) {
- return true;
- }
-
- final QueueSize queueSize = preFetch.size();
- return queueSize.getObjectCount() == 0;
- } else {
- return false;
- }
- }
-
- @Override
- public QueueSize getActiveQueueSize() {
- readLock.lock();
- try {
- final PreFetch preFetch = preFetchRef.get();
- if (preFetch == null) {
- return new QueueSize(activeQueue.size(), activeQueueContentSize);
- } else {
- final QueueSize preFetchSize = preFetch.size();
- return new QueueSize(activeQueue.size() + preFetchSize.getObjectCount(), activeQueueContentSize + preFetchSize.getByteCount());
- }
- } finally {
- readLock.unlock("getActiveQueueSize");
- }
- }
-
- @Override
- public void acknowledge(final FlowFileRecord flowFile) {
- if (queueFullRef.get()) {
- writeLock.lock();
- try {
- updateUnacknowledgedSize(-1, -flowFile.getSize());
- queueFullRef.set(determineIfFull());
- } finally {
- writeLock.unlock("acknowledge(FlowFileRecord)");
- }
- } else {
- updateUnacknowledgedSize(-1, -flowFile.getSize());
- }
-
- if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
- // queue was full but no longer is. Notify that the source may now be available to run,
- // because of back pressure caused by this queue.
- scheduler.registerEvent(connection.getSource());
- }
- }
-
- @Override
- public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
- long totalSize = 0L;
- for (final FlowFileRecord flowFile : flowFiles) {
- totalSize += flowFile.getSize();
- }
-
- if (queueFullRef.get()) {
- writeLock.lock();
- try {
- updateUnacknowledgedSize(-flowFiles.size(), -totalSize);
- queueFullRef.set(determineIfFull());
- } finally {
- writeLock.unlock("acknowledge(FlowFileRecord)");
- }
- } else {
- updateUnacknowledgedSize(-flowFiles.size(), -totalSize);
- }
-
- if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
- // it's possible that queue was full but no longer is. Notify that the source may now be available to run,
- // because of back pressure caused by this queue.
- scheduler.registerEvent(connection.getSource());
- }
- }
-
- @Override
- public boolean isFull() {
- return queueFullRef.get();
- }
-
- /**
- * MUST be called with either the read or write lock held
- *
- * @return
- */
- private boolean determineIfFull() {
- final long maxSize = maximumQueueObjectCount;
- final long maxBytes = maximumQueueByteCount;
- if (maxSize <= 0 && maxBytes <= 0) {
- return false;
- }
-
- final QueueSize queueSize = getQueueSize();
- if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) {
- return true;
- }
-
- if (maxBytes > 0 && (queueSize.getByteCount() >= maxBytes)) {
- return true;
- }
-
- return false;
- }
-
- @Override
- public void put(final FlowFileRecord file) {
- writeLock.lock();
- try {
- if (swapMode || activeQueue.size() >= swapThreshold) {
- swapQueue.add(file);
- swappedContentSize += file.getSize();
- swappedRecordCount++;
- swapMode = true;
- } else {
- activeQueueContentSize += file.getSize();
- activeQueue.add(file);
- }
-
- queueFullRef.set(determineIfFull());
- } finally {
- activeQueueSizeRef.set(activeQueue.size());
- writeLock.unlock("put(FlowFileRecord)");
- }
-
- if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
- scheduler.registerEvent(connection.getDestination());
- }
- }
-
- @Override
- public void putAll(final Collection<FlowFileRecord> files) {
- final int numFiles = files.size();
- long bytes = 0L;
- for (final FlowFile flowFile : files) {
- bytes += flowFile.getSize();
- }
-
- writeLock.lock();
- try {
- if (swapMode || activeQueue.size() >= swapThreshold - numFiles) {
- swapQueue.addAll(files);
- swappedContentSize += bytes;
- swappedRecordCount += numFiles;
- swapMode = true;
- } else {
- activeQueueContentSize += bytes;
- activeQueue.addAll(files);
- }
-
- queueFullRef.set(determineIfFull());
- } finally {
- activeQueueSizeRef.set(activeQueue.size());
- writeLock.unlock("putAll");
- }
-
- if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
- scheduler.registerEvent(connection.getDestination());
- }
- }
-
- @Override
- public List<FlowFileRecord> pollSwappableRecords() {
- writeLock.lock();
- try {
- if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
- return null;
- }
-
- final List<FlowFileRecord> swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size()));
- final Iterator<FlowFileRecord> itr = swapQueue.iterator();
- while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) {
- FlowFileRecord record = itr.next();
- swapRecords.add(record);
- itr.remove();
- }
-
- swapQueue.trimToSize();
- return swapRecords;
- } finally {
- writeLock.unlock("pollSwappableRecords");
- }
- }
-
- @Override
- public void putSwappedRecords(final Collection<FlowFileRecord> records) {
- writeLock.lock();
- try {
- try {
- for (final FlowFileRecord record : records) {
- swappedContentSize -= record.getSize();
- swappedRecordCount--;
- activeQueueContentSize += record.getSize();
- activeQueue.add(record);
- }
-
- if (swappedRecordCount > swapQueue.size()) {
- // we have more swap files to be swapped in.
- return;
- }
-
- // If a call to #pollSwappableRecords will not produce any, go ahead and roll those FlowFiles back into the mix
- if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
- for (final FlowFileRecord record : swapQueue) {
- activeQueue.add(record);
- activeQueueContentSize += record.getSize();
- }
- swapQueue.clear();
- swappedContentSize = 0L;
- swappedRecordCount = 0;
- swapMode = false;
- }
- } finally {
- activeQueueSizeRef.set(activeQueue.size());
- }
- } finally {
- writeLock.unlock("putSwappedRecords");
- scheduler.registerEvent(connection.getDestination());
- }
- }
-
- @Override
- public void incrementSwapCount(final int numRecords, final long contentSize) {
- writeLock.lock();
- try {
- swappedContentSize += contentSize;
- swappedRecordCount += numRecords;
- } finally {
- writeLock.unlock("incrementSwapCount");
- }
- }
-
- @Override
- public int unswappedSize() {
- readLock.lock();
- try {
- return activeQueue.size() + unacknowledgedSizeRef.get().getObjectCount();
- } finally {
- readLock.unlock("unswappedSize");
- }
- }
-
- @Override
- public int getSwapRecordCount() {
- readLock.lock();
- try {
- return swappedRecordCount;
- } finally {
- readLock.unlock("getSwapRecordCount");
- }
- }
-
- @Override
- public int getSwapQueueSize() {
- readLock.lock();
- try {
- if (logger.isDebugEnabled()) {
- final long byteToMbDivisor = 1024L * 1024L;
- final QueueSize unacknowledged = unacknowledgedSizeRef.get();
-
- logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB",
- activeQueue.size(), activeQueueContentSize / byteToMbDivisor,
- swappedRecordCount, swappedContentSize / byteToMbDivisor,
- unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor);
- }
-
- return swapQueue.size();
- } finally {
- readLock.unlock("getSwapQueueSize");
- }
- }
-
- private boolean isLaterThan(final Long maxAge) {
- if (maxAge == null) {
- return false;
- }
- return maxAge < System.currentTimeMillis();
- }
-
- private Long getExpirationDate(final FlowFile flowFile, final long expirationMillis) {
- if (flowFile == null) {
- return null;
- }
- if (expirationMillis <= 0) {
- return null;
- } else {
- final long entryDate = flowFile.getEntryDate();
- final long expirationDate = entryDate + expirationMillis;
- return expirationDate;
- }
- }
-
- @Override
- public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
- FlowFileRecord flowFile = null;
-
- // First check if we have any records Pre-Fetched.
- final long expirationMillis = flowFileExpirationMillis.get();
- final PreFetch preFetch = preFetchRef.get();
- if (preFetch != null) {
- if (preFetch.isExpired()) {
- requeueExpiredPrefetch(preFetch);
- } else {
- while (true) {
- final FlowFileRecord next = preFetch.nextRecord();
- if (next == null) {
- break;
- }
-
- if (isLaterThan(getExpirationDate(next, expirationMillis))) {
- expiredRecords.add(next);
- continue;
- }
-
- updateUnacknowledgedSize(1, next.getSize());
- return next;
- }
-
- preFetchRef.compareAndSet(preFetch, null);
- }
- }
-
- writeLock.lock();
- try {
- flowFile = doPoll(expiredRecords, expirationMillis);
- return flowFile;
- } finally {
- activeQueueSizeRef.set(activeQueue.size());
- writeLock.unlock("poll(Set)");
-
- if (flowFile != null) {
- updateUnacknowledgedSize(1, flowFile.getSize());
- }
- }
- }
-
- private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
- FlowFileRecord flowFile;
- boolean isExpired;
-
- migrateSwapToActive();
- boolean queueFullAtStart = queueFullRef.get();
-
- do {
- flowFile = this.activeQueue.poll();
-
- isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
- if (isExpired) {
- expiredRecords.add(flowFile);
- if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
- activeQueueContentSize -= flowFile.getSize();
- break;
- }
- } else if (flowFile != null && flowFile.isPenalized()) {
- this.activeQueue.add(flowFile);
- flowFile = null;
- break;
- }
-
- if (flowFile != null) {
- activeQueueContentSize -= flowFile.getSize();
- }
- } while (isExpired);
-
- // if at least 1 FlowFile was expired & the queue was full before we started, then
- // we need to determine whether or not the queue is full again. If no FlowFile was expired,
- // then the queue will still be full until the appropriate #acknowledge method is called.
- if (queueFullAtStart && !expiredRecords.isEmpty()) {
- queueFullRef.set(determineIfFull());
- }
-
- if (incrementPollCount()) {
- prefetch();
- }
- return isExpired ? null : flowFile;
- }
-
- @Override
- public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) {
- final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, maxResults));
-
- // First check if we have any records Pre-Fetched.
- final long expirationMillis = flowFileExpirationMillis.get();
- final PreFetch preFetch = preFetchRef.get();
- if (preFetch != null) {
- if (preFetch.isExpired()) {
- requeueExpiredPrefetch(preFetch);
- } else {
- long totalSize = 0L;
- for (int i = 0; i < maxResults; i++) {
- final FlowFileRecord next = preFetch.nextRecord();
- if (next == null) {
- break;
- }
-
- if (isLaterThan(getExpirationDate(next, expirationMillis))) {
- expiredRecords.add(next);
- continue;
- }
-
- records.add(next);
- totalSize += next.getSize();
- }
-
- // If anything was prefetched, use what we have.
- if (!records.isEmpty()) {
- updateUnacknowledgedSize(records.size(), totalSize);
- return records;
- }
-
- preFetchRef.compareAndSet(preFetch, null);
- }
- }
-
- writeLock.lock();
- try {
- doPoll(records, maxResults, expiredRecords);
- } finally {
- activeQueueSizeRef.set(activeQueue.size());
- writeLock.unlock("poll(int, Set)");
- }
- return records;
- }
-
- private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) {
- migrateSwapToActive();
-
- final boolean queueFullAtStart = queueFullRef.get();
-
- final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords);
-
- long expiredBytes = 0L;
- for (final FlowFileRecord record : expiredRecords) {
- expiredBytes += record.getSize();
- }
-
- activeQueueContentSize -= bytesDrained;
- updateUnacknowledgedSize(records.size(), bytesDrained - expiredBytes);
-
- // if at least 1 FlowFile was expired & the queue was full before we started, then
- // we need to determine whether or not the queue is full again. If no FlowFile was expired,
- // then the queue will still be full until the appropriate #acknowledge method is called.
- if (queueFullAtStart && !expiredRecords.isEmpty()) {
- queueFullRef.set(determineIfFull());
- }
-
- if (incrementPollCount()) {
- prefetch();
- }
- }
-
- /**
- * If there are FlowFiles waiting on the swap queue, move them to the active
- * queue until we meet our threshold. This prevents us from having to swap
- * them to disk & then back out.
- *
- * This method MUST be called with the writeLock held.
- */
- private void migrateSwapToActive() {
- // Migrate as many FlowFiles as we can from the Swap Queue to the Active Queue, so that we don't
- // have to swap them out & then swap them back in.
- // If we don't do this, we could get into a situation where we have potentially thousands of FlowFiles
- // sitting on the Swap Queue but not getting processed because there aren't enough to be swapped out.
- // In particular, this can happen if the queue is typically filled with surges.
- // For example, if the queue has 25,000 FlowFiles come in, it may process 20,000 of them and leave
- // 5,000 sitting on the Swap Queue. If it then takes an hour for an additional 5,000 FlowFiles to come in,
- // those FlowFiles sitting on the Swap Queue will sit there for an hour, waiting to be swapped out and
- // swapped back in again.
- // Calling this method when records are polled prevents this condition by migrating FlowFiles from the
- // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
- // to disk, because we want them to be swapped back in in the same order that they were swapped out.
-
- // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense
- // of other checks for 99.999% of the cases.
- if (swappedRecordCount == 0 && swapQueue.isEmpty()) {
- return;
- }
-
- if (swappedRecordCount > swapQueue.size()) {
- // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for
- // an external process to swap FlowFiles back in.
- return;
- }
-
- final Iterator<FlowFileRecord> swapItr = swapQueue.iterator();
- while (activeQueue.size() < swapThreshold && swapItr.hasNext()) {
- final FlowFileRecord toMigrate = swapItr.next();
- activeQueue.add(toMigrate);
- activeQueueContentSize += toMigrate.getSize();
- swappedContentSize -= toMigrate.getSize();
- swappedRecordCount--;
-
- swapItr.remove();
- }
-
- if (swappedRecordCount == 0) {
- swapMode = false;
- }
- }
-
- @Override
- public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords) {
- long drainedSize = 0L;
- FlowFileRecord pulled = null;
-
- final long expirationMillis = this.flowFileExpirationMillis.get();
- while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
- if (isLaterThan(getExpirationDate(pulled, expirationMillis))) {
- expiredRecords.add(pulled);
- if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
- break;
- }
- } else {
- if (pulled.isPenalized()) {
- sourceQueue.add(pulled);
- break;
- }
- destination.add(pulled);
- }
- drainedSize += pulled.getSize();
- }
- return drainedSize;
- }
-
- @Override
- public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
- writeLock.lock();
- try {
- migrateSwapToActive();
- if (activeQueue.isEmpty()) {
- return Collections.emptyList();
- }
-
- final long expirationMillis = this.flowFileExpirationMillis.get();
- final boolean queueFullAtStart = queueFullRef.get();
-
- final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
- final List<FlowFileRecord> unselected = new ArrayList<>();
-
- while (true) {
- FlowFileRecord flowFile = this.activeQueue.poll();
- if (flowFile == null) {
- break;
- }
-
- final boolean isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
- if (isExpired) {
- expiredRecords.add(flowFile);
- activeQueueContentSize -= flowFile.getSize();
-
- if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
- break;
- } else {
- continue;
- }
- } else if (flowFile.isPenalized()) {
- this.activeQueue.add(flowFile);
- flowFile = null;
- break; // just stop searching because the rest are all penalized.
- }
-
- final FlowFileFilterResult result = filter.filter(flowFile);
- if (result.isAccept()) {
- activeQueueContentSize -= flowFile.getSize();
-
- updateUnacknowledgedSize(1, flowFile.getSize());
- selectedFlowFiles.add(flowFile);
- } else {
- unselected.add(flowFile);
- }
-
- if (!result.isContinue()) {
- break;
- }
- }
-
- this.activeQueue.addAll(unselected);
-
- // if at least 1 FlowFile was expired & the queue was full before we started, then
- // we need to determine whether or not the queue is full again. If no FlowFile was expired,
- // then the queue will still be full until the appropriate #acknowledge method is called.
- if (queueFullAtStart && !expiredRecords.isEmpty()) {
- queueFullRef.set(determineIfFull());
- }
-
- return selectedFlowFiles;
- } finally {
- activeQueueSizeRef.set(activeQueue.size());
- writeLock.unlock("poll(Filter, Set)");
- }
- }
-
- private static final class Prioritizer implements Comparator<FlowFileRecord>, Serializable {
-
- private static final long serialVersionUID = 1L;
- private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
-
- private Prioritizer(final List<FlowFilePrioritizer> priorities) {
- if (null != priorities) {
- prioritizers.addAll(priorities);
- }
- }
-
- @Override
- public int compare(final FlowFileRecord f1, final FlowFileRecord f2) {
- int returnVal = 0;
- final boolean f1Penalized = f1.isPenalized();
- final boolean f2Penalized = f2.isPenalized();
-
- if (f1Penalized && !f2Penalized) {
- return 1;
- } else if (!f1Penalized && f2Penalized) {
- return -1;
- }
-
- if (f1Penalized && f2Penalized) {
- if (f1.getPenaltyExpirationMillis() < f2.getPenaltyExpirationMillis()) {
- return -1;
- } else if (f1.getPenaltyExpirationMillis() > f2.getPenaltyExpirationMillis()) {
- return 1;
- }
- }
-
- if (!prioritizers.isEmpty()) {
- for (final FlowFilePrioritizer prioritizer : prioritizers) {
- returnVal = prioritizer.compare(f1, f2);
- if (returnVal != 0) {
- return returnVal;
- }
- }
- }
-
- final ContentClaim claim1 = f1.getContentClaim();
- final ContentClaim claim2 = f2.getContentClaim();
-
- // put the one without a claim first
- if (claim1 == null && claim2 != null) {
- return -1;
- } else if (claim1 != null && claim2 == null) {
- return 1;
- } else if (claim1 != null && claim2 != null) {
- final int claimComparison = claim1.compareTo(claim2);
- if (claimComparison != 0) {
- return claimComparison;
- }
-
- final int claimOffsetComparison = Long.compare(f1.getContentClaimOffset(), f2.getContentClaimOffset());
- if (claimOffsetComparison != 0) {
- return claimOffsetComparison;
- }
- }
-
- return Long.compare(f1.getId(), f2.getId());
- }
- }
-
- @Override
- public String getFlowFileExpiration() {
- return flowFileExpirationPeriod.get();
- }
-
- @Override
- public int getFlowFileExpiration(final TimeUnit timeUnit) {
- return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void setFlowFileExpiration(final String flowExpirationPeriod) {
- final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
- if (millis < 0) {
- throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
- }
- this.flowFileExpirationPeriod.set(flowExpirationPeriod);
- this.flowFileExpirationMillis.set(millis);
- }
-
- @Override
- public String toString() {
- return "FlowFileQueue[id=" + identifier + "]";
- }
-
- /**
- * Lock the queue so that other threads are unable to interact with the
- * queue
- */
- public void lock() {
- writeLock.lock();
- }
-
- /**
- * Unlock the queue
- */
- public void unlock() {
- writeLock.unlock("external unlock");
- }
-
- private void updateUnacknowledgedSize(final int addToCount, final long addToSize) {
- boolean updated = false;
-
- do {
- QueueSize queueSize = unacknowledgedSizeRef.get();
- final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize);
- updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize);
- } while (!updated);
- }
-
- private void requeueExpiredPrefetch(final PreFetch prefetch) {
- if (prefetch == null) {
- return;
- }
-
- writeLock.lock();
- try {
- final long contentSizeRequeued = prefetch.requeue(activeQueue);
- this.activeQueueContentSize += contentSizeRequeued;
- this.preFetchRef.compareAndSet(prefetch, null);
- } finally {
- writeLock.unlock("requeueExpiredPrefetch");
- }
- }
-
- /**
- * MUST be called with write lock held.
- */
- private final AtomicReference<PreFetch> preFetchRef = new AtomicReference<>();
-
- private void prefetch() {
- if (activeQueue.isEmpty()) {
- return;
- }
-
- final int numToFetch = Math.min(prefetchSize, activeQueue.size());
-
- final PreFetch curPreFetch = preFetchRef.get();
- if (curPreFetch != null && curPreFetch.size().getObjectCount() > 0) {
- return;
- }
-
- final List<FlowFileRecord> buffer = new ArrayList<>(numToFetch);
- long contentSize = 0L;
- for (int i = 0; i < numToFetch; i++) {
- final FlowFileRecord record = activeQueue.poll();
- if (record == null || record.isPenalized()) {
- // not enough unpenalized records to pull. Put all records back and return
- activeQueue.addAll(buffer);
- if ( record != null ) {
- activeQueue.add(record);
- }
- return;
- } else {
- buffer.add(record);
- contentSize += record.getSize();
- }
- }
-
- activeQueueContentSize -= contentSize;
- preFetchRef.set(new PreFetch(buffer));
- }
-
- private final TimedBuffer<TimestampedLong> pollCounts = new TimedBuffer<>(TimeUnit.SECONDS, 5, new LongEntityAccess());
-
- private boolean incrementPollCount() {
- pollCounts.add(new TimestampedLong(1L));
- final long totalCount = pollCounts.getAggregateValue(System.currentTimeMillis() - 5000L).getValue();
- return totalCount > PREFETCH_POLL_THRESHOLD * 5;
- }
-
- private static class PreFetch {
-
- private final List<FlowFileRecord> records;
- private final AtomicInteger pointer = new AtomicInteger(0);
- private final long expirationTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1L);
- private final AtomicLong contentSize = new AtomicLong(0L);
-
- public PreFetch(final List<FlowFileRecord> records) {
- this.records = records;
-
- long totalSize = 0L;
- for (final FlowFileRecord record : records) {
- totalSize += record.getSize();
- }
- contentSize.set(totalSize);
- }
-
- public FlowFileRecord nextRecord() {
- final int nextValue = pointer.getAndIncrement();
- if (nextValue >= records.size()) {
- return null;
- }
-
- final FlowFileRecord flowFile = records.get(nextValue);
- contentSize.addAndGet(-flowFile.getSize());
- return flowFile;
- }
-
- public QueueSize size() {
- final int pointerIndex = pointer.get();
- final int count = records.size() - pointerIndex;
- if (count < 0) {
- return new QueueSize(0, 0L);
- }
-
- final long bytes = contentSize.get();
- return new QueueSize(count, bytes);
- }
-
- public boolean isExpired() {
- return System.nanoTime() > expirationTime;
- }
-
- private long requeue(final Queue<FlowFileRecord> queue) {
- // get the current pointer and prevent any other thread from accessing the rest of the elements
- final int curPointer = pointer.getAndAdd(records.size());
- if (curPointer < records.size() - 1) {
- final List<FlowFileRecord> subList = records.subList(curPointer, records.size());
- long contentSize = 0L;
- for (final FlowFileRecord record : subList) {
- contentSize += record.getSize();
- }
-
- queue.addAll(subList);
-
- return contentSize;
- }
- return 0L;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
deleted file mode 100644
index e34e043..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++ /dev/null
@@ -1,541 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.ConnectableType;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.connectable.Funnel;
-import org.apache.nifi.connectable.Position;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.FormatUtils;
-
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-
-public class StandardFunnel implements Funnel {
-
- public static final long MINIMUM_PENALIZATION_MILLIS = 0L;
- public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
- public static final long MINIMUM_YIELD_MILLIS = 0L;
- public static final long DEFAULT_YIELD_PERIOD = 1000L;
- public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS;
-
- private final String identifier;
- private final Set<Connection> outgoingConnections;
- private final List<Connection> incomingConnections;
- private final List<Relationship> relationships;
-
- private final AtomicReference<ProcessGroup> processGroupRef;
- private final AtomicReference<Position> position;
- private final AtomicReference<String> penalizationPeriod;
- private final AtomicReference<String> yieldPeriod;
- private final AtomicReference<String> schedulingPeriod;
- private final AtomicReference<String> name;
- private final AtomicLong schedulingNanos;
- private final AtomicBoolean lossTolerant;
- private final AtomicReference<ScheduledState> scheduledState;
- private final AtomicLong yieldExpiration;
-
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
- private final Lock readLock = rwLock.readLock();
- private final Lock writeLock = rwLock.writeLock();
-
- public StandardFunnel(final String identifier, final ProcessGroup processGroup, final ProcessScheduler scheduler) {
- this.identifier = identifier;
- this.processGroupRef = new AtomicReference<>(processGroup);
-
- outgoingConnections = new HashSet<>();
- incomingConnections = new ArrayList<>();
-
- final List<Relationship> relationships = new ArrayList<>();
- relationships.add(Relationship.ANONYMOUS);
- this.relationships = Collections.unmodifiableList(relationships);
-
- lossTolerant = new AtomicBoolean(false);
- position = new AtomicReference<>(new Position(0D, 0D));
- scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
- penalizationPeriod = new AtomicReference<>("30 sec");
- yieldPeriod = new AtomicReference<>("1 sec");
- yieldExpiration = new AtomicLong(0L);
- schedulingPeriod = new AtomicReference<>("0 millis");
- schedulingNanos = new AtomicLong(30000);
- name = new AtomicReference<>("Funnel");
- }
-
- @Override
- public String getIdentifier() {
- return identifier;
- }
-
- @Override
- public Collection<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- public Relationship getRelationship(final String relationshipName) {
- return (Relationship.ANONYMOUS.getName().equals(relationshipName)) ? Relationship.ANONYMOUS : null;
- }
-
- @Override
- public void addConnection(final Connection connection) throws IllegalArgumentException {
- writeLock.lock();
- try {
- if (!requireNonNull(connection).getSource().equals(this) && !connection.getDestination().equals(this)) {
- throw new IllegalArgumentException("Cannot add a connection to a Funnel for which the Funnel is neither the Source nor the Destination");
- }
- if (connection.getSource().equals(this) && connection.getDestination().equals(this)) {
- throw new IllegalArgumentException("Cannot add a connection from a Funnel back to itself");
- }
-
- if (connection.getDestination().equals(this)) {
- // don't add the connection twice. This may occur if we have a self-loop because we will be told
- // to add the connection once because we are the source and again because we are the destination.
- if (!incomingConnections.contains(connection)) {
- incomingConnections.add(connection);
- }
- }
-
- if (connection.getSource().equals(this)) {
- // don't add the connection twice. This may occur if we have a self-loop because we will be told
- // to add the connection once because we are the source and again because we are the destination.
- if (!outgoingConnections.contains(connection)) {
- for (final Relationship relationship : connection.getRelationships()) {
- if (!relationship.equals(Relationship.ANONYMOUS)) {
- throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Funnels");
- }
- }
-
- outgoingConnections.add(connection);
- }
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public boolean hasIncomingConnection() {
- readLock.lock();
- try {
- return !incomingConnections.isEmpty();
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public void updateConnection(final Connection connection) throws IllegalStateException {
- if (requireNonNull(connection).getSource().equals(this)) {
- writeLock.lock();
- try {
- if (!outgoingConnections.remove(connection)) {
- throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
- }
- outgoingConnections.add(connection);
- } finally {
- writeLock.unlock();
- }
- }
-
- if (connection.getDestination().equals(this)) {
- writeLock.lock();
- try {
- if (!incomingConnections.remove(connection)) {
- throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port");
- }
- incomingConnections.add(connection);
- } finally {
- writeLock.unlock();
- }
- }
- }
-
- @Override
- public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException {
- writeLock.lock();
- try {
- if (!requireNonNull(connection).getSource().equals(this)) {
- final boolean existed = incomingConnections.remove(connection);
- if (!existed) {
- throw new IllegalStateException("The given connection is not currently registered for this ProcessorNode");
- }
- return;
- }
-
- final boolean removed = outgoingConnections.remove(connection);
- if (!removed) {
- throw new IllegalStateException(connection + " is not registered with " + this);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public Set<Connection> getConnections() {
- readLock.lock();
- try {
- return Collections.unmodifiableSet(outgoingConnections);
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public Set<Connection> getConnections(final Relationship relationship) {
- readLock.lock();
- try {
- if (relationship.equals(Relationship.ANONYMOUS)) {
- return Collections.unmodifiableSet(outgoingConnections);
- }
-
- throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Funnels");
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public List<Connection> getIncomingConnections() {
- readLock.lock();
- try {
- return new ArrayList<>(incomingConnections);
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public Position getPosition() {
- return position.get();
- }
-
- @Override
- public void setPosition(Position position) {
- this.position.set(position);
- }
-
- @Override
- public String getName() {
- return name.get();
- }
-
- /**
- * Throws {@link UnsupportedOperationException}
- *
- * @param name
- */
- @Override
- public void setName(final String name) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getComments() {
- return "";
- }
-
- @Override
- public void setComments(final String comments) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ProcessGroup getProcessGroup() {
- return processGroupRef.get();
- }
-
- @Override
- public void setProcessGroup(final ProcessGroup group) {
- processGroupRef.set(group);
- }
-
- @Override
- public boolean isAutoTerminated(Relationship relationship) {
- return false;
- }
-
- @Override
- public boolean isRunning() {
- return isRunning(this);
- }
-
- private boolean isRunning(final Connectable source) {
- return getScheduledState() == ScheduledState.RUNNING;
- }
-
- @Override
- public boolean isTriggerWhenEmpty() {
- return false;
- }
-
- @Override
- public ScheduledState getScheduledState() {
- return scheduledState.get();
- }
-
- @Override
- public boolean isLossTolerant() {
- return lossTolerant.get();
- }
-
- @Override
- public void setLossTolerant(final boolean lossTolerant) {
- this.lossTolerant.set(lossTolerant);
- }
-
- @Override
- public String toString() {
- return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", getIdentifier()).toString();
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
- final ProcessSession session = sessionFactory.createSession();
-
- try {
- onTrigger(context, session);
- session.commit();
- } catch (final ProcessException e) {
- session.rollback();
- throw e;
- } catch (final Throwable t) {
- session.rollback();
- throw new RuntimeException(t);
- }
- }
-
- private void onTrigger(final ProcessContext context, final ProcessSession session) {
- readLock.lock();
- try {
- Set<Relationship> available = context.getAvailableRelationships();
- int transferred = 0;
- while (!available.isEmpty()) {
- final List<FlowFile> flowFiles = session.get(10);
- if (flowFiles.isEmpty()) {
- break;
- }
-
- transferred += flowFiles.size();
- session.transfer(flowFiles, Relationship.ANONYMOUS);
- session.commit();
- available = context.getAvailableRelationships();
- }
-
- if (transferred == 0) {
- context.yield();
- }
- } finally {
- readLock.unlock();
- }
- }
-
- /**
- * Has no effect
- */
- @Override
- public void setMaxConcurrentTasks(int taskCount) {
- }
-
- @Override
- public int getMaxConcurrentTasks() {
- return 1;
- }
-
- @Override
- public void setScheduledState(final ScheduledState scheduledState) {
- this.scheduledState.set(scheduledState);
- }
-
- @Override
- public ConnectableType getConnectableType() {
- return ConnectableType.FUNNEL;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Collection<ValidationResult> getValidationErrors() {
- return Collections.EMPTY_LIST;
- }
-
- /**
- * Updates the amount of time that this processor should avoid being
- * scheduled when the processor calls
- * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()}
- *
- * @param yieldPeriod
- */
- @Override
- public void setYieldPeriod(final String yieldPeriod) {
- final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
- if (yieldMillis < 0) {
- throw new IllegalArgumentException("Yield duration must be positive");
- }
- this.yieldPeriod.set(yieldPeriod);
- }
-
- /**
- * @param schedulingPeriod
- */
- @Override
- public void setScheduldingPeriod(final String schedulingPeriod) {
- final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS);
- if (schedulingNanos < 0) {
- throw new IllegalArgumentException("Scheduling Period must be positive");
- }
-
- this.schedulingPeriod.set(schedulingPeriod);
- this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
- }
-
- @Override
- public long getPenalizationPeriod(final TimeUnit timeUnit) {
- return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
- }
-
- @Override
- public String getPenalizationPeriod() {
- return penalizationPeriod.get();
- }
-
- /**
- * Causes the processor not to be scheduled for some period of time. This
- * duration can be obtained and set via the
- * {@link #getYieldPeriod(TimeUnit)} and
- * {@link #setYieldPeriod(long, TimeUnit)} methods.
- */
- @Override
- public void yield() {
- final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
- yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis));
- }
-
- @Override
- public long getYieldExpiration() {
- return yieldExpiration.get();
- }
-
- @Override
- public String getSchedulingPeriod() {
- return schedulingPeriod.get();
- }
-
- @Override
- public void setPenalizationPeriod(final String penalizationPeriod) {
- this.penalizationPeriod.set(penalizationPeriod);
- }
-
- @Override
- public String getYieldPeriod() {
- return yieldPeriod.get();
- }
-
- @Override
- public long getYieldPeriod(final TimeUnit timeUnit) {
- return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
- }
-
- @Override
- public long getSchedulingPeriod(final TimeUnit timeUnit) {
- return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS);
- }
-
- @Override
- public boolean isSideEffectFree() {
- return true;
- }
-
- @Override
- public void verifyCanDelete(boolean ignoreConnections) throws IllegalStateException {
- if (ignoreConnections) {
- return;
- }
-
- readLock.lock();
- try {
- for (final Connection connection : outgoingConnections) {
- connection.verifyCanDelete();
- }
-
- for (final Connection connection : incomingConnections) {
- if (connection.getSource().equals(this)) {
- connection.verifyCanDelete();
- } else {
- throw new IllegalStateException(this + " is the destination of another component");
- }
- }
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public void verifyCanDelete() {
- verifyCanDelete(false);
- }
-
- @Override
- public void verifyCanStart() {
- }
-
- @Override
- public void verifyCanStop() {
- }
-
- @Override
- public void verifyCanUpdate() {
- }
-
- @Override
- public void verifyCanEnable() {
- }
-
- @Override
- public void verifyCanDisable() {
- }
-
- @Override
- public SchedulingStrategy getSchedulingStrategy() {
- return SchedulingStrategy.TIMER_DRIVEN;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
deleted file mode 100644
index df3c251..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.util.Map;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-
-public interface ValidationContextFactory {
-
- ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java
deleted file mode 100644
index 2f43600..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/WorkerQueue.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.connectable.Connectable;
-
-public interface WorkerQueue {
-
- EventBasedWorker poll(long timeout, TimeUnit timeUnit);
-
- void offer(Connectable worker);
-
- void setClustered(boolean clustered);
-
- void setPrimary(boolean primary);
-
- void suspendWork(Connectable worker);
-
- void resumeWork(Connectable worker);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java
deleted file mode 100644
index 368ed1b..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/CommunicationsException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.exception;
-
-import java.io.IOException;
-
-public class CommunicationsException extends IOException {
-
- private static final long serialVersionUID = 142343242323423L;
-
- public CommunicationsException() {
- super();
- }
-
- public CommunicationsException(final Throwable cause) {
- super(cause);
- }
-
- public CommunicationsException(final String explanation) {
- super(explanation);
- }
-
- public CommunicationsException(final String explanation, final Throwable cause) {
- super(explanation, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java
deleted file mode 100644
index 0ff68b0..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.exception;
-
-public class ControllerServiceAlreadyExistsException extends RuntimeException {
-
- private static final long serialVersionUID = -544424320587059277L;
-
- /**
- * Constructs a default exception
- * @param id
- */
- public ControllerServiceAlreadyExistsException(final String id) {
- super("A Controller Service already exists with ID " + id);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
deleted file mode 100644
index 4cdbe54..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.exception;
-
-public class ControllerServiceNotFoundException extends RuntimeException {
-
- private static final long serialVersionUID = -544424320587059277L;
-
- /**
- * Constructs a default exception
- */
- public ControllerServiceNotFoundException() {
- super();
- }
-
- /**
- * @param message
- */
- public ControllerServiceNotFoundException(String message) {
- super(message);
- }
-
- /**
- * @param cause
- */
- public ControllerServiceNotFoundException(Throwable cause) {
- super(cause);
- }
-
- /**
- * @param message
- * @param cause
- */
- public ControllerServiceNotFoundException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java
deleted file mode 100644
index c4aba44..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorInstantiationException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.exception;
-
-public class ProcessorInstantiationException extends Exception {
-
- private static final long serialVersionUID = 189273489L;
-
- public ProcessorInstantiationException(final String className, final Throwable t) {
- super(className, t);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java
deleted file mode 100644
index 5acca16..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/exception/ProcessorLifeCycleException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.exception;
-
-public class ProcessorLifeCycleException extends RuntimeException {
-
- private static final long serialVersionUID = 8392341500511490941L;
-
- public ProcessorLifeCycleException(final String message, final Throwable t) {
- super(message, t);
- }
-
- public ProcessorLifeCycleException(final Throwable t) {
- super(t);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java
deleted file mode 100644
index 97c44b5..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/label/Label.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.label;
-
-import java.util.Map;
-
-import org.apache.nifi.connectable.Position;
-import org.apache.nifi.connectable.Size;
-import org.apache.nifi.groups.ProcessGroup;
-
-public interface Label {
-
- String getIdentifier();
-
- Position getPosition();
-
- void setPosition(Position position);
-
- Map<String, String> getStyle();
-
- void setStyle(Map<String, String> style);
-
- Size getSize();
-
- void setSize(Size size);
-
- ProcessGroup getProcessGroup();
-
- void setProcessGroup(ProcessGroup group);
-
- String getValue();
-
- void setValue(String value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java
deleted file mode 100644
index ced6ff9..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskInstantiationException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.reporting;
-
-public class ReportingTaskInstantiationException extends Exception {
-
- private static final long serialVersionUID = 189234789237L;
-
- public ReportingTaskInstantiationException(final String className, final Throwable t) {
- super(className, t);
- }
-
- public ReportingTaskInstantiationException(final String message) {
- super(message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
deleted file mode 100644
index 6ce7ba6..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-
-/**
- *
- * @author none
- */
-public class ContentNotFoundException extends RuntimeException {
-
- private static final long serialVersionUID = 19048239082L;
- private final transient ContentClaim claim;
-
- public ContentNotFoundException(final ContentClaim claim) {
- super("Could not find content for " + claim);
- this.claim = claim;
- }
-
- public ContentNotFoundException(final ContentClaim claim, final Throwable t) {
- super("Could not find content for " + claim, t);
- this.claim = claim;
- }
-
- public ContentNotFoundException(final ContentClaim claim, final String message) {
- super("Could not find content for " + claim + ": " + message);
- this.claim = claim;
- }
-
- public ContentClaim getMissingClaim() {
- return claim;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java
deleted file mode 100644
index de231ed..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/CounterRepository.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import java.util.List;
-
-import org.apache.nifi.controller.Counter;
-
-public interface CounterRepository {
-
- void adjustCounter(String counterContext, String name, long delta);
-
- Counter getCounter(String counterContext, String name);
-
- List<Counter> getCounters();
-
- List<Counter> getCounters(String counterContext);
-
- Counter resetCounter(String identifier);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
deleted file mode 100644
index f07a530..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-public interface FlowFileEvent {
-
- String getComponentIdentifier();
-
- int getFlowFilesIn();
-
- int getFlowFilesOut();
-
- int getFlowFilesRemoved();
-
- long getContentSizeIn();
-
- long getContentSizeOut();
-
- long getContentSizeRemoved();
-
- long getBytesRead();
-
- long getBytesWritten();
-
- long getProcessingNanoseconds();
-
- long getAverageLineageMillis();
-
- long getAggregateLineageMillis();
-
- int getFlowFilesReceived();
-
- long getBytesReceived();
-
- int getFlowFilesSent();
-
- long getBytesSent();
-
- int getInvocations();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
deleted file mode 100644
index 2eb3caf..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- *
- * @author none
- */
-public interface FlowFileEventRepository extends Closeable {
-
- /**
- * Updates the repository to include a new FlowFile processing event
- *
- * @param event
- * @throws java.io.IOException
- */
- void updateRepository(FlowFileEvent event) throws IOException;
-
- /**
- * Returns a report of processing activity since the given time
- * @param sinceEpochMillis
- * @return
- */
- RepositoryStatusReport reportTransferEvents(long sinceEpochMillis);
-
- /**
- * Causes any flow file events of the given entry age in epoch milliseconds
- * or older to be purged from the repository
- *
- * @param cutoffEpochMilliseconds
- */
- void purgeTransferEvents(long cutoffEpochMilliseconds);
-}