You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/26 15:16:53 UTC

[37/47] incubator-nifi git commit: NIFI-6: Rebase from develop to include renaming of directory structure

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 0000000,071be4d..fe72ae4
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@@ -1,0 -1,1243 +1,1257 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller;
+ 
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.behavior.TriggerSerially;
++import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
+ import org.apache.nifi.controller.ProcessScheduler;
+ import org.apache.nifi.controller.ValidationContextFactory;
+ import org.apache.nifi.controller.ProcessorNode;
++
+ import static java.util.Objects.requireNonNull;
+ 
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ 
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.connectable.ConnectableType;
+ import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.connectable.Position;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.groups.ProcessGroup;
+ import org.apache.nifi.logging.LogLevel;
+ import org.apache.nifi.logging.LogRepositoryFactory;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.Processor;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.TriggerSerially;
 -import org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable;
 -import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
+ import org.apache.nifi.scheduling.SchedulingStrategy;
+ import org.apache.nifi.util.FormatUtils;
 -
+ import org.apache.commons.lang3.builder.EqualsBuilder;
+ import org.apache.commons.lang3.builder.HashCodeBuilder;
+ import org.quartz.CronExpression;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * ProcessorNode provides thread-safe access to a FlowFileProcessor as it exists
+  * within a controlled flow. This node keeps track of the processor, its
+  * scheduling information and its relationships to other processors and whatever
+  * scheduled futures exist for it. Must be thread safe.
+  *
+  * @author none
+  */
+ public class StandardProcessorNode extends ProcessorNode implements Connectable {
+ 
+     public static final String BULLETIN_OBSERVER_ID = "bulletin-observer";
+ 
+     public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
+     public static final String DEFAULT_YIELD_PERIOD = "1 sec";
+     public static final String DEFAULT_PENALIZATION_PERIOD = "30 sec";
+     private final AtomicReference<ProcessGroup> processGroup;
+     private final Processor processor;
+     private final AtomicReference<String> identifier;
+     private final Map<Connection, Connectable> destinations;
+     private final Map<Relationship, Set<Connection>> connections;
+     private final AtomicReference<Set<Relationship>> undefinedRelationshipsToTerminate;
+     private final AtomicReference<List<Connection>> incomingConnectionsRef;
+     private final ReentrantReadWriteLock rwLock;
+     private final Lock readLock;
+     private final Lock writeLock;
+     private final AtomicBoolean isolated;
+     private final AtomicBoolean lossTolerant;
+     private final AtomicReference<ScheduledState> scheduledState;
+     private final AtomicReference<String> comments;
+     private final AtomicReference<String> name;
+     private final AtomicReference<Position> position;
+     private final AtomicReference<String> annotationData;
+     private final AtomicReference<String> schedulingPeriod; // stored as string so it's presented to user as they entered it
+     private final AtomicReference<String> yieldPeriod;
+     private final AtomicReference<String> penalizationPeriod;
+     private final AtomicReference<Map<String, String>> style;
+     private final AtomicInteger concurrentTaskCount;
+     private final AtomicLong yieldExpiration;
+     private final AtomicLong schedulingNanos;
+     private final boolean triggerWhenEmpty;
+     private final boolean sideEffectFree;
+     private final boolean triggeredSerially;
+     private final boolean triggerWhenAnyDestinationAvailable;
+     private final boolean eventDrivenSupported;
+     private final boolean batchSupported;
+     private final ValidationContextFactory validationContextFactory;
+     private final ProcessScheduler processScheduler;
+     private long runNanos = 0L;
+ 
+     private SchedulingStrategy schedulingStrategy;  // guarded by read/write lock
+ 
++    @SuppressWarnings("deprecation")
+     StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory,
+             final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider) {
+         super(processor, uuid, validationContextFactory, controllerServiceProvider);
+ 
+         this.processor = processor;
+         identifier = new AtomicReference<>(uuid);
+         destinations = new HashMap<>();
+         connections = new HashMap<>();
+         incomingConnectionsRef = new AtomicReference<List<Connection>>(new ArrayList<Connection>());
+         scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
+         rwLock = new ReentrantReadWriteLock(false);
+         readLock = rwLock.readLock();
+         writeLock = rwLock.writeLock();
+         lossTolerant = new AtomicBoolean(false);
+         final Set<Relationship> emptySetOfRelationships = new HashSet<>();
+         undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships);
+         comments = new AtomicReference<>("");
+         name = new AtomicReference<>(processor.getClass().getSimpleName());
+         schedulingPeriod = new AtomicReference<>("0 sec");
+         schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
+         yieldPeriod = new AtomicReference<>(DEFAULT_YIELD_PERIOD);
+         yieldExpiration = new AtomicLong(0L);
+         concurrentTaskCount = new AtomicInteger(1);
+         position = new AtomicReference<>(new Position(0D, 0D));
+         style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<String, String>()));
+         this.processGroup = new AtomicReference<>();
+         processScheduler = scheduler;
+         annotationData = new AtomicReference<>();
+         isolated = new AtomicBoolean(false);
+         penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD);
+ 
 -        triggerWhenEmpty = processor.getClass().isAnnotationPresent(TriggerWhenEmpty.class);
 -        sideEffectFree = processor.getClass().isAnnotationPresent(SideEffectFree.class);
 -        batchSupported = processor.getClass().isAnnotationPresent(SupportsBatching.class);
 -        triggeredSerially = processor.getClass().isAnnotationPresent(TriggerSerially.class);
 -        triggerWhenAnyDestinationAvailable = processor.getClass().isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class);
++        final Class<?> procClass = processor.getClass();
++        triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class);
++        sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SideEffectFree.class);
++        batchSupported = procClass.isAnnotationPresent(SupportsBatching.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SupportsBatching.class);
++        triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerSerially.class);
++        triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class);
+         this.validationContextFactory = validationContextFactory;
 -        eventDrivenSupported = processor.getClass().isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty;
++        eventDrivenSupported = (procClass.isAnnotationPresent(EventDriven.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class) )&& !triggeredSerially && !triggerWhenEmpty;
+         schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
+     }
+ 
+     /**
+      * @return comments about this specific processor instance
+      */
+     @Override
+     public String getComments() {
+         return comments.get();
+     }
+ 
+     /**
+      * Provides and opportunity to retain information about this particular
+      * processor instance
+      *
+      * @param comments
+      */
+     @Override
+     public void setComments(final String comments) {
+         writeLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+             }
+             this.comments.set(comments);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public ScheduledState getScheduledState() {
+         return scheduledState.get();
+     }
+ 
+     @Override
+     public Position getPosition() {
+         return position.get();
+     }
+ 
+     @Override
+     public void setPosition(Position position) {
+         this.position.set(position);
+     }
+ 
+     public Map<String, String> getStyle() {
+         return style.get();
+     }
+ 
+     public void setStyle(final Map<String, String> style) {
+         if (style != null) {
+             this.style.set(Collections.unmodifiableMap(new HashMap<>(style)));
+         }
+     }
+ 
+     @Override
+     public String getIdentifier() {
+         return identifier.get();
+     }
+ 
+     /**
+      * @return if true flow file content generated by this processor is
+      * considered loss tolerant
+      */
+     @Override
+     public boolean isLossTolerant() {
+         return lossTolerant.get();
+     }
+ 
+     /**
+      * @return if true processor runs only on the primary node
+      */
+     public boolean isIsolated() {
+         return isolated.get();
+     }
+ 
+     /**
+      * @return true if the processor has the {@link TriggerWhenEmpty}
+      * annotation, false otherwise.
+      */
+     @Override
+     public boolean isTriggerWhenEmpty() {
+         return triggerWhenEmpty;
+     }
+ 
+     /**
+      * @return true if the processor has the {@link SideEffectFree} annotation,
+      * false otherwise.
+      */
+     public boolean isSideEffectFree() {
+         return sideEffectFree;
+     }
+ 
+     @Override
+     public boolean isHighThroughputSupported() {
+         return batchSupported;
+     }
+ 
+     /**
+      * @return true if the processor has the
+      * {@link TriggerWhenAnyDestinationAvailable} annotation, false otherwise.
+      */
+     public boolean isTriggerWhenAnyDestinationAvailable() {
+         return triggerWhenAnyDestinationAvailable;
+     }
+ 
+     /**
+      * Indicates whether flow file content made by this processor must be
+      * persisted
+      *
+      * @param lossTolerant
+      */
+     @Override
+     public void setLossTolerant(final boolean lossTolerant) {
+         writeLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+             }
+             this.lossTolerant.set(lossTolerant);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * Indicates whether the processor runs on only the primary node.
+      *
+      * @param isolated
+      */
+     public void setIsolated(final boolean isolated) {
+         writeLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+             }
+             this.isolated.set(isolated);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public boolean isAutoTerminated(final Relationship relationship) {
+         final Set<Relationship> terminatable = undefinedRelationshipsToTerminate.get();
+         if (terminatable == null) {
+             return false;
+         }
+         return terminatable.contains(relationship);
+     }
+ 
+     /**
+      * Indicates whether flow files transferred to undefined relationships
+      * should be terminated
+      *
+      * @param terminate
+      */
+     @Override
+     public void setAutoTerminatedRelationships(final Set<Relationship> terminate) {
+         writeLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+             }
+ 
+             for (final Relationship rel : terminate) {
+                 if (!getConnections(rel).isEmpty()) {
+                     throw new IllegalStateException("Cannot mark relationship '" + rel.getName() + "' as auto-terminated because Connection already exists with this relationship");
+                 }
+             }
+             undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * @return an unmodifiable Set that contains all of the
+      * ProcessorRelationship objects that are configured to be auto-terminated
+      */
+     public Set<Relationship> getAutoTerminatedRelationships() {
+         Set<Relationship> relationships = undefinedRelationshipsToTerminate.get();
+         if (relationships == null) {
+             relationships = new HashSet<>();
+         }
+         return Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     public String getName() {
+         return name.get();
+     }
+ 
+     /**
+      * @return the value of the processor's {@link CapabilityDescription}
+      * annotation, if one exists, else <code>null</code>.
+      */
++    @SuppressWarnings("deprecation")
+     public String getProcessorDescription() {
 -        final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class);
 -        return (capDesc == null) ? null : capDesc.value();
++        CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class);
++        String description = null;
++        if ( capDesc != null ) {
++            description = capDesc.value();
++        } else {
++            final org.apache.nifi.processor.annotation.CapabilityDescription deprecatedCapDesc = 
++                    processor.getClass().getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class);
++            if ( deprecatedCapDesc != null ) {
++                description = deprecatedCapDesc.value();
++            }
++        }
++        
++        return description;
+     }
+ 
+     @Override
+     public void setName(final String name) {
+         writeLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+             }
+             this.name.set(name);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * @param timeUnit determines the unit of time to represent the scheduling
+      * period. If null will be reported in units of
+      * {@link #DEFAULT_SCHEDULING_TIME_UNIT}
+      * @return the schedule period that should elapse before subsequent cycles
+      * of this processor's tasks
+      */
+     @Override
+     public long getSchedulingPeriod(final TimeUnit timeUnit) {
+         return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS);
+     }
+ 
+     public boolean isEventDrivenSupported() {
+         readLock.lock();
+         try {
+             return this.eventDrivenSupported;
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     /**
+      * Updates the Scheduling Strategy used for this Processor
+      *
+      * @param schedulingStrategy
+      *
+      * @throws IllegalArgumentException if the SchedulingStrategy is not not
+      * applicable for this Processor
+      */
+     public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) {
+         writeLock.lock();
+         try {
+             if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !eventDrivenSupported) {
+                 // not valid. Just ignore it. We don't throw an Exception because if a developer changes a Processor so that
+                 // it no longer supports EventDriven mode, we don't want the app to fail to startup if it was already in Event-Driven
+                 // Mode. Instead, we will simply leave it in Timer-Driven mode
+                 return;
+             }
+ 
+             this.schedulingStrategy = schedulingStrategy;
+             setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * Returns the currently configured scheduling strategy
+      *
+      * @return
+      */
+     public SchedulingStrategy getSchedulingStrategy() {
+         readLock.lock();
+         try {
+             return this.schedulingStrategy;
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public String getSchedulingPeriod() {
+         return schedulingPeriod.get();
+     }
+ 
+     /**
+      * @param value the number of <code>timeUnit</code>s between scheduling
+      * intervals.
+      * @param timeUnit determines the unit of time to represent the scheduling
+      * period.
+      */
+     @Override
+     public void setScheduldingPeriod(final String schedulingPeriod) {
+         writeLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+             }
+ 
+             switch (schedulingStrategy) {
+                 case CRON_DRIVEN: {
+                     try {
+                         new CronExpression(schedulingPeriod);
+                     } catch (final Exception e) {
+                         throw new IllegalArgumentException("Scheduling Period is not a valid cron expression: " + schedulingPeriod);
+                     }
+                 }
+                 break;
+                 case PRIMARY_NODE_ONLY:
+                 case TIMER_DRIVEN: {
+                     final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS);
+                     if (schedulingNanos < 0) {
+                         throw new IllegalArgumentException("Scheduling Period must be positive");
+                     }
+                     this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos));
+                 }
+                 break;
+                 case EVENT_DRIVEN:
+                 default:
+                     return;
+             }
+ 
+             this.schedulingPeriod.set(schedulingPeriod);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public long getRunDuration(final TimeUnit timeUnit) {
+         readLock.lock();
+         try {
+             return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void setRunDuration(final long duration, final TimeUnit timeUnit) {
+         writeLock.lock();
+         try {
+             if (duration < 0) {
+                 throw new IllegalArgumentException("Run Duration must be non-negative value; cannot set to " + timeUnit.toSeconds(duration) + " seconds");
+             }
+ 
+             this.runNanos = timeUnit.toNanos(duration);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * @param timeUnit determines the unit of time to represent the yield
+      * period. If null will be reported in units of
+      * {@link #DEFAULT_SCHEDULING_TIME_UNIT}.
+      * @return
+      */
+     @Override
+     public long getYieldPeriod(final TimeUnit timeUnit) {
+         return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
+     }
+ 
+     public String getYieldPeriod() {
+         return yieldPeriod.get();
+     }
+ 
+     /**
+      * Updates the amount of time that this processor should avoid being
+      * scheduled when the processor calls
+      * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()}
+      *
+      * @param yieldPeriod
+      */
+     @Override
+     public void setYieldPeriod(final String yieldPeriod) {
+         writeLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+             }
+             final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
+             if (yieldMillis < 0) {
+                 throw new IllegalArgumentException("Yield duration must be positive");
+             }
+             this.yieldPeriod.set(yieldPeriod);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * Causes the processor not to be scheduled for some period of time. This
+      * duration can be obtained and set via the
+      * {@link #getYieldPeriod(TimeUnit)} and
+      * {@link #setYieldPeriod(long, TimeUnit)} methods.
+      */
+     @Override
+     public void yield() {
+         final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
+         yield(yieldMillis, TimeUnit.MILLISECONDS);
+ 
+         final String yieldDuration = (yieldMillis > 1000) ? (yieldMillis / 1000) + " seconds" : yieldMillis + " milliseconds";
+         LoggerFactory.getLogger(processor.getClass()).debug("{} has chosen to yield its resources; will not be scheduled to run again for {}", processor, yieldDuration);
+     }
+ 
+     public void yield(final long period, final TimeUnit timeUnit) {
+         final long yieldMillis = TimeUnit.MILLISECONDS.convert(period, timeUnit);
+         yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis));
+ 
+         processScheduler.yield(this);
+     }
+ 
+     /**
+      * @return the number of milliseconds since Epoch at which time this
+      * processor is to once again be scheduled.
+      */
+     @Override
+     public long getYieldExpiration() {
+         return yieldExpiration.get();
+     }
+ 
+     @Override
+     public long getPenalizationPeriod(final TimeUnit timeUnit) {
+         return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit);
+     }
+ 
+     @Override
+     public String getPenalizationPeriod() {
+         return penalizationPeriod.get();
+     }
+ 
+     @Override
+     public void setPenalizationPeriod(final String penalizationPeriod) {
+         writeLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+             }
+             final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), TimeUnit.MILLISECONDS);
+             if (penalizationMillis < 0) {
+                 throw new IllegalArgumentException("Penalization duration must be positive");
+             }
+             this.penalizationPeriod.set(penalizationPeriod);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     /**
+      * Determines the number of concurrent tasks that may be running for this
+      * processor.
+      *
+      * @param taskCount a number of concurrent tasks this processor may have
+      * running
+      * @throws IllegalArgumentException if the given value is less than 1
+      */
+     @Override
+     public void setMaxConcurrentTasks(final int taskCount) {
+         writeLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+             }
+             if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) {
+                 throw new IllegalArgumentException();
+             }
+             if (!triggeredSerially) {
+                 concurrentTaskCount.set(taskCount);
+             }
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     public boolean isTriggeredSerially() {
+         return triggeredSerially;
+     }
+ 
+     /**
+      * @return the number of tasks that may execute concurrently for this
+      * processor
+      */
+     @Override
+     public int getMaxConcurrentTasks() {
+         return concurrentTaskCount.get();
+     }
+ 
+     public LogLevel getBulletinLevel() {
+         return LogRepositoryFactory.getRepository(getIdentifier()).getObservationLevel(BULLETIN_OBSERVER_ID);
+     }
+ 
+     public void setBulletinLevel(final LogLevel level) {
+         LogRepositoryFactory.getRepository(getIdentifier()).setObservationLevel(BULLETIN_OBSERVER_ID, level);
+     }
+ 
+     @Override
+     public Set<Connection> getConnections() {
+         final Set<Connection> allConnections = new HashSet<>();
+         readLock.lock();
+         try {
+             for (final Set<Connection> connectionSet : connections.values()) {
+                 allConnections.addAll(connectionSet);
+             }
+         } finally {
+             readLock.unlock();
+         }
+ 
+         return allConnections;
+     }
+ 
+     @Override
+     public List<Connection> getIncomingConnections() {
+         return incomingConnectionsRef.get();
+     }
+ 
+     @Override
+     public Set<Connection> getConnections(final Relationship relationship) {
+         final Set<Connection> applicableConnections;
+         readLock.lock();
+         try {
+             applicableConnections = connections.get(relationship);
+         } finally {
+             readLock.unlock();
+         }
+         return (applicableConnections == null) ? Collections.<Connection>emptySet() : Collections.unmodifiableSet(applicableConnections);
+     }
+ 
+     @Override
+     public void addConnection(final Connection connection) {
+         Objects.requireNonNull(connection, "connection cannot be null");
+ 
+         if (!connection.getSource().equals(this) && !connection.getDestination().equals(this)) {
+             throw new IllegalStateException("Cannot a connection to a ProcessorNode for which the ProcessorNode is neither the Source nor the Destination");
+         }
+ 
+         writeLock.lock();
+         try {
+             List<Connection> updatedIncoming = null;
+             if (connection.getDestination().equals(this)) {
+                 // don't add the connection twice. This may occur if we have a self-loop because we will be told
+                 // to add the connection once because we are the source and again because we are the destination.
+                 final List<Connection> incomingConnections = incomingConnectionsRef.get();
+                 updatedIncoming = new ArrayList<>(incomingConnections);
+                 if (!updatedIncoming.contains(connection)) {
+                     updatedIncoming.add(connection);
+                 }
+             }
+ 
+             if (connection.getSource().equals(this)) {
+                 // don't add the connection twice. This may occur if we have a self-loop because we will be told
+                 // to add the connection once because we are the source and again because we are the destination.
+                 if (!destinations.containsKey(connection)) {
+                     for (final Relationship relationship : connection.getRelationships()) {
+                         final Relationship rel = getRelationship(relationship.getName());
+                         Set<Connection> set = connections.get(rel);
+                         if (set == null) {
+                             set = new HashSet<>();
+                             connections.put(rel, set);
+                         }
+ 
+                         set.add(connection);
+ 
+                         destinations.put(connection, connection.getDestination());
+                     }
+ 
+                     final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get();
+                     if (autoTerminated != null) {
+                         autoTerminated.removeAll(connection.getRelationships());
+                         this.undefinedRelationshipsToTerminate.set(autoTerminated);
+                     }
+                 }
+             }
+ 
+             if (updatedIncoming != null) {
+                 incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
+             }
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public boolean hasIncomingConnection() {
+         return !incomingConnectionsRef.get().isEmpty();
+     }
+ 
+     @Override
+     public void updateConnection(final Connection connection) throws IllegalStateException {
+         if (requireNonNull(connection).getSource().equals(this)) {
+             writeLock.lock();
+             try {
+                 //
+                 // update any relationships
+                 //
+                 // first check if any relations were removed.
+                 final List<Relationship> existingRelationships = new ArrayList<>();
+                 for (final Map.Entry<Relationship, Set<Connection>> entry : connections.entrySet()) {
+                     if (entry.getValue().contains(connection)) {
+                         existingRelationships.add(entry.getKey());
+                     }
+                 }
+ 
+                 for (final Relationship rel : connection.getRelationships()) {
+                     if (!existingRelationships.contains(rel)) {
+                         // relationship was removed. Check if this is legal.
+                         final Set<Connection> connectionsForRelationship = getConnections(rel);
+                         if (connectionsForRelationship != null && connectionsForRelationship.size() == 1 && this.isRunning() && !isAutoTerminated(rel) && getRelationships().contains(rel)) {
+                             // if we are running and we do not terminate undefined relationships and this is the only
+                             // connection that defines the given relationship, and that relationship is required,
+                             // then it is not legal to remove this relationship from this connection.
+                             throw new IllegalStateException("Cannot remove relationship " + rel.getName() + " from Connection because doing so would invalidate Processor " + this + ", which is currently running");
+                         }
+                     }
+                 }
+ 
+                 // remove the connection from any list that currently contains
+                 for (final Set<Connection> list : connections.values()) {
+                     list.remove(connection);
+                 }
+ 
+                 // add the connection in for all relationships listed.
+                 for (final Relationship rel : connection.getRelationships()) {
+                     Set<Connection> set = connections.get(rel);
+                     if (set == null) {
+                         set = new HashSet<>();
+                         connections.put(rel, set);
+                     }
+                     set.add(connection);
+                 }
+ 
+                 // update to the new destination
+                 destinations.put(connection, connection.getDestination());
+ 
+                 final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get();
+                 if (autoTerminated != null) {
+                     autoTerminated.removeAll(connection.getRelationships());
+                     this.undefinedRelationshipsToTerminate.set(autoTerminated);
+                 }
+             } finally {
+                 writeLock.unlock();
+             }
+         }
+ 
+         if (connection.getDestination().equals(this)) {
+             writeLock.lock();
+             try {
+                 // update our incoming connections -- we can just remove & re-add the connection to
+                 // update the list.
+                 final List<Connection> incomingConnections = incomingConnectionsRef.get();
+                 final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections);
+                 updatedIncoming.remove(connection);
+                 updatedIncoming.add(connection);
+                 incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
+             } finally {
+                 writeLock.unlock();
+             }
+         }
+     }
+ 
+     @Override
+     public void removeConnection(final Connection connection) {
+         boolean connectionRemoved = false;
+ 
+         if (requireNonNull(connection).getSource().equals(this)) {
+             for (final Relationship relationship : connection.getRelationships()) {
+                 final Set<Connection> connectionsForRelationship = getConnections(relationship);
+                 if ((connectionsForRelationship == null || connectionsForRelationship.size() <= 1) && isRunning()) {
+                     throw new IllegalStateException("This connection cannot be removed because its source is running and removing it will invalidate this processor");
+                 }
+             }
+ 
+             writeLock.lock();
+             try {
+                 for (final Set<Connection> connectionList : this.connections.values()) {
+                     connectionList.remove(connection);
+                 }
+ 
+                 connectionRemoved = (destinations.remove(connection) != null);
+             } finally {
+                 writeLock.unlock();
+             }
+         }
+ 
+         if (connection.getDestination().equals(this)) {
+             writeLock.lock();
+             try {
+                 final List<Connection> incomingConnections = incomingConnectionsRef.get();
+                 if (incomingConnections.contains(connection)) {
+                     final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections);
+                     updatedIncoming.remove(connection);
+                     incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming));
+                     return;
+                 }
+             } finally {
+                 writeLock.unlock();
+             }
+         }
+ 
+         if (!connectionRemoved) {
+             throw new IllegalArgumentException("Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source");
+         }
+     }
+ 
+     /**
+      * Gets the relationship for this nodes processor for the given name or
+      * creates a new relationship for the given name.
+      *
+      * @param relationshipName
+      * @return
+      */
+     @Override
+     public Relationship getRelationship(final String relationshipName) {
+         final Relationship specRel = new Relationship.Builder().name(relationshipName).build();
+         Relationship returnRel = specRel;
+ 
+         final Set<Relationship> relationships;
+         try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+             relationships = processor.getRelationships();
+         }
+ 
+         for (final Relationship rel : relationships) {
+             if (rel.equals(specRel)) {
+                 returnRel = rel;
+                 break;
+             }
+         }
+         return returnRel;
+     }
+ 
+     public Processor getProcessor() {
+         return this.processor;
+     }
+ 
+     /**
+      * Obtains the Set of destination processors for all relationships excluding
+      * any destinations that are this processor itself (self-loops)
+      *
+      * @return
+      */
+     public Set<Connectable> getDestinations() {
+         final Set<Connectable> nonSelfDestinations = new HashSet<>();
+         readLock.lock();
+         try {
+             for (final Connectable connectable : destinations.values()) {
+                 if (connectable != this) {
+                     nonSelfDestinations.add(connectable);
+                 }
+             }
+         } finally {
+             readLock.unlock();
+         }
+         return nonSelfDestinations;
+     }
+ 
+     public Set<Connectable> getDestinations(final Relationship relationship) {
+         readLock.lock();
+         try {
+             final Set<Connectable> destinationSet = new HashSet<>();
+             final Set<Connection> relationshipConnections = connections.get(relationship);
+             if (relationshipConnections != null) {
+                 for (final Connection connection : relationshipConnections) {
+                     destinationSet.add(destinations.get(connection));
+                 }
+             }
+             return destinationSet;
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     public Set<Relationship> getUndefinedRelationships() {
+         final Set<Relationship> undefined = new HashSet<>();
+         readLock.lock();
+         try {
+             final Set<Relationship> relationships;
+             try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                 relationships = processor.getRelationships();
+             }
+ 
+             if (relationships == null) {
+                 return undefined;
+             }
+             for (final Relationship relation : relationships) {
+                 final Set<Connection> connectionSet = this.connections.get(relation);
+                 if (connectionSet == null || connectionSet.isEmpty()) {
+                     undefined.add(relation);
+                 }
+             }
+         } finally {
+             readLock.unlock();
+         }
+         return undefined;
+     }
+ 
+     /**
+      * Determines if the given node is a destination for this node
+      *
+      * @param node
+      * @return true if is a direct destination node; false otherwise
+      */
+     boolean isRelated(final ProcessorNode node) {
+         readLock.lock();
+         try {
+             return this.destinations.containsValue(node);
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public boolean isRunning() {
+         readLock.lock();
+         try {
+             return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0;
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public boolean isValid() {
+         readLock.lock();
+         try {
+             final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData());
+ 
+             final Collection<ValidationResult> validationResults;
+             try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                 validationResults = getProcessor().validate(validationContext);
+             }
+ 
+             for (final ValidationResult result : validationResults) {
+                 if (!result.isValid()) {
+                     return false;
+                 }
+             }
+ 
+             for (final Relationship undef : getUndefinedRelationships()) {
+                 if (!isAutoTerminated(undef)) {
+                     return false;
+                 }
+             }
+         } catch (final Throwable t) {
+             return false;
+         } finally {
+             readLock.unlock();
+         }
+ 
+         return true;
+     }
+ 
+     @Override
+     public Collection<ValidationResult> getValidationErrors() {
+         final List<ValidationResult> results = new ArrayList<>();
+         readLock.lock();
+         try {
+             final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData());
+ 
+             final Collection<ValidationResult> validationResults;
+             try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                 validationResults = getProcessor().validate(validationContext);
+             }
+ 
+             for (final ValidationResult result : validationResults) {
+                 if (!result.isValid()) {
+                     results.add(result);
+                 }
+             }
+ 
+             for (final Relationship relationship : getUndefinedRelationships()) {
+                 if (!isAutoTerminated(relationship)) {
+                     final ValidationResult error = new ValidationResult.Builder()
+                             .explanation("Relationship '" + relationship.getName() + "' is not connected to any component and is not auto-terminated")
+                             .subject("Relationship " + relationship.getName())
+                             .valid(false)
+                             .build();
+                     results.add(error);
+                 }
+             }
+         } catch (final Throwable t) {
+             results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build());
+         } finally {
+             readLock.unlock();
+         }
+         return results;
+     }
+ 
+     /**
+      * Establishes node equality (based on the processor's identifier)
+      *
+      * @param other
+      * @return
+      */
+     @Override
+     public boolean equals(final Object other) {
+         if (!(other instanceof ProcessorNode)) {
+             return false;
+         }
+         final ProcessorNode on = (ProcessorNode) other;
+         return new EqualsBuilder().append(identifier.get(), on.getIdentifier()).isEquals();
+     }
+ 
+     @Override
+     public int hashCode() {
+         return new HashCodeBuilder(7, 67).append(identifier).toHashCode();
+     }
+ 
+     @Override
+     public Collection<Relationship> getRelationships() {
+         try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+             return getProcessor().getRelationships();
+         }
+     }
+ 
+     @Override
+     public String toString() {
+         try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+             return getProcessor().toString();
+         }
+     }
+ 
+     @Override
+     public ProcessGroup getProcessGroup() {
+         return processGroup.get();
+     }
+ 
+     @Override
+     public void setProcessGroup(final ProcessGroup group) {
+         writeLock.lock();
+         try {
+             this.processGroup.set(group);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
+         try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+             processor.onTrigger(context, sessionFactory);
+         }
+     }
+ 
+     @Override
+     public ConnectableType getConnectableType() {
+         return ConnectableType.PROCESSOR;
+     }
+ 
+     public void setScheduledState(final ScheduledState scheduledState) {
+         this.scheduledState.set(scheduledState);
+         if (!scheduledState.equals(ScheduledState.RUNNING)) {   // if user stops processor, clear yield expiration
+             yieldExpiration.set(0L);
+         }
+     }
+ 
+     @Override
+     public void setAnnotationData(final String data) {
+         writeLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException("Cannot set AnnotationData while processor is running");
+             }
+ 
+             this.annotationData.set(data);
+         } finally {
+             writeLock.unlock();
+         }
+     }
+ 
+     @Override
+     public String getAnnotationData() {
+         return annotationData.get();
+     }
+ 
+     @Override
+     public Collection<ValidationResult> validate(final ValidationContext validationContext) {
+         return processor.validate(validationContext);
+     }
+ 
+     @Override
+     public void verifyCanDelete() throws IllegalStateException {
+         verifyCanDelete(false);
+     }
+ 
+     @Override
+     public void verifyCanDelete(final boolean ignoreConnections) {
+         readLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException(this + " is running");
+             }
+ 
+             if (!ignoreConnections) {
+                 for (final Set<Connection> connectionSet : connections.values()) {
+                     for (final Connection connection : connectionSet) {
+                         connection.verifyCanDelete();
+                     }
+                 }
+ 
+                 for (final Connection connection : incomingConnectionsRef.get()) {
+                     if (connection.getSource().equals(this)) {
+                         connection.verifyCanDelete();
+                     } else {
+                         throw new IllegalStateException(this + " is the destination of another component");
+                     }
+                 }
+             }
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void verifyCanStart() {
+         readLock.lock();
+         try {
+             if (scheduledState.get() != ScheduledState.STOPPED) {
+                 throw new IllegalStateException(this + " is not stopped");
+             }
+             verifyNoActiveThreads();
+ 
+             if (!isValid()) {
+                 throw new IllegalStateException(this + " is not in a valid state");
+             }
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void verifyCanStop() {
+         if (getScheduledState() != ScheduledState.RUNNING) {
+             throw new IllegalStateException(this + " is not scheduled to run");
+         }
+     }
+ 
+     @Override
+     public void verifyCanUpdate() {
+         readLock.lock();
+         try {
+             if (isRunning()) {
+                 throw new IllegalStateException(this + " is not stopped");
+             }
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void verifyCanEnable() {
+         readLock.lock();
+         try {
+             if (getScheduledState() != ScheduledState.DISABLED) {
+                 throw new IllegalStateException(this + " is not disabled");
+             }
+ 
+             verifyNoActiveThreads();
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void verifyCanDisable() {
+         readLock.lock();
+         try {
+             if (getScheduledState() != ScheduledState.STOPPED) {
+                 throw new IllegalStateException(this + " is not stopped");
+             }
+             verifyNoActiveThreads();
+         } finally {
+             readLock.unlock();
+         }
+     }
+ 
+     private void verifyNoActiveThreads() throws IllegalStateException {
+         final int threadCount = processScheduler.getActiveThreadCount(this);
+         if (threadCount > 0) {
+             throw new IllegalStateException(this + " has " + threadCount + " threads still active");
+         }
+     }
+ 
+     @Override
+     public void verifyModifiable() throws IllegalStateException {
+         if (isRunning()) {
+             throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 0000000,6c27470..7c3734a
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@@ -1,0 -1,111 +1,210 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller.reporting;
+ 
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import org.apache.nifi.controller.AbstractConfiguredComponent;
+ import org.apache.nifi.controller.Availability;
+ import org.apache.nifi.controller.ConfigurationContext;
+ import org.apache.nifi.controller.ControllerServiceLookup;
+ import org.apache.nifi.controller.ProcessScheduler;
+ import org.apache.nifi.controller.ReportingTaskNode;
++import org.apache.nifi.controller.ScheduledState;
+ import org.apache.nifi.controller.ValidationContextFactory;
++import org.apache.nifi.controller.annotation.OnConfigured;
++import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.controller.service.StandardConfigurationContext;
++import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.reporting.ReportingTask;
+ import org.apache.nifi.scheduling.SchedulingStrategy;
+ import org.apache.nifi.util.FormatUtils;
++import org.apache.nifi.util.ReflectionUtils;
+ 
+ public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode {
+ 
+     private final ReportingTask reportingTask;
+     private final ProcessScheduler processScheduler;
+     private final ControllerServiceLookup serviceLookup;
+ 
+     private final AtomicReference<SchedulingStrategy> schedulingStrategy = new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN);
+     private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins");
+     private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
+ 
++    private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
++    
+     public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
+             final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
+             final ValidationContextFactory validationContextFactory) {
+         super(reportingTask, id, validationContextFactory, controllerServiceProvider);
+         this.reportingTask = reportingTask;
+         this.processScheduler = processScheduler;
+         this.serviceLookup = controllerServiceProvider;
+     }
+ 
+     @Override
+     public Availability getAvailability() {
+         return availability.get();
+     }
+ 
+     @Override
+     public void setAvailability(final Availability availability) {
+         this.availability.set(availability);
+     }
+ 
+     @Override
+     public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) {
+         this.schedulingStrategy.set(schedulingStrategy);
+     }
+ 
+     @Override
+     public SchedulingStrategy getSchedulingStrategy() {
+         return schedulingStrategy.get();
+     }
+ 
+     @Override
+     public String getSchedulingPeriod() {
+         return schedulingPeriod.get();
+     }
+ 
+     @Override
+     public long getSchedulingPeriod(final TimeUnit timeUnit) {
+         return FormatUtils.getTimeDuration(schedulingPeriod.get(), timeUnit);
+     }
+ 
+     @Override
+     public void setScheduldingPeriod(final String schedulingPeriod) {
+         this.schedulingPeriod.set(schedulingPeriod);
+     }
+ 
+     @Override
+     public ReportingTask getReportingTask() {
+         return reportingTask;
+     }
+ 
+     @Override
+     public boolean isRunning() {
+         return processScheduler.isScheduled(this) || processScheduler.getActiveThreadCount(this) > 0;
+     }
+ 
+     @Override
+     public ConfigurationContext getConfigurationContext() {
+         return new StandardConfigurationContext(this, serviceLookup);
+     }
+ 
+     @Override
+     public void verifyModifiable() throws IllegalStateException {
+         if (isRunning()) {
+             throw new IllegalStateException("Cannot modify Reporting Task while the Reporting Task is running");
+         }
+     }
+ 
++    @Override
++    public ScheduledState getScheduledState() {
++        return scheduledState;
++    }
++    
++    @Override
++    public void setScheduledState(final ScheduledState state) {
++        this.scheduledState = state;
++    }
++    
++    @Override
++    public void setProperty(final String name, final String value) {
++        super.setProperty(name, value);
++        
++        onConfigured();
++    }
++    
++    @Override
++    public boolean removeProperty(String name) {
++        final boolean removed = super.removeProperty(name);
++        if ( removed ) {
++            onConfigured();
++        }
++        
++        return removed;
++    }
++    
++    private void onConfigured() {
++        // We need to invoke any method annotation with the OnConfigured annotation in order to
++        // maintain backward compatibility. This will be removed when we remove the old, deprecated annotations.
++        try (final NarCloseable x = NarCloseable.withNarLoader()) {
++            final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup);
++            ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext);
++        } catch (final Exception e) {
++            throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e);
++        }
++    }
++    
++    public boolean isDisabled() {
++        return scheduledState == ScheduledState.DISABLED;
++    }
++    
++    @Override
++    public void verifyCanDelete() {
++        if (isRunning()) {
++            throw new IllegalStateException("Cannot delete " + reportingTask + " because it is currently running");
++        }
++    }
++    
++    @Override
++    public void verifyCanDisable() {
++        if ( isRunning() ) {
++            throw new IllegalStateException("Cannot disable " + reportingTask + " because it is currently running");
++        }
++        
++        if ( isDisabled() ) {
++            throw new IllegalStateException("Cannot disable " + reportingTask + " because it is already disabled");
++        }
++    }
++    
++    
++    @Override
++    public void verifyCanEnable() {
++        if ( !isDisabled() ) {
++            throw new IllegalStateException("Cannot enable " + reportingTask + " because it is not disabled");
++        }
++    }
++    
++    @Override
++    public void verifyCanStart() {
++        if ( isDisabled() ) {
++            throw new IllegalStateException("Cannot start " + reportingTask + " because it is currently disabled");
++        }
++        
++        if ( isRunning() ) {
++            throw new IllegalStateException("Cannot start " + reportingTask + " because it is already running");
++        }
++    }
++    
++    @Override
++    public void verifyCanStop() {
++        if ( !isRunning() ) {
++            throw new IllegalStateException("Cannot stop " + reportingTask + " because it is not running");
++        }
++    }
++    
++    @Override
++    public void verifyCanUpdate() {
++        if ( isRunning() ) {
++            throw new IllegalStateException("Cannot update " + reportingTask + " because it is currently running");
++        }
++    }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 0000000,af801bb..7455bf8
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@@ -1,0 -1,346 +1,346 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller.scheduling;
+ 
+ import java.io.IOException;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.controller.EventBasedWorker;
+ import org.apache.nifi.controller.EventDrivenWorkerQueue;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.ReportingTaskNode;
+ import org.apache.nifi.controller.repository.BatchingSessionFactory;
+ import org.apache.nifi.controller.repository.ProcessContext;
+ import org.apache.nifi.controller.repository.StandardFlowFileEvent;
+ import org.apache.nifi.controller.repository.StandardProcessSession;
+ import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.engine.FlowEngine;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.SimpleProcessLogger;
+ import org.apache.nifi.processor.StandardProcessContext;
 -import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.util.Connectables;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.ReflectionUtils;
 -
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class EventDrivenSchedulingAgent implements SchedulingAgent {
+ 
+     private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
+ 
+     private final FlowEngine flowEngine;
+     private final ControllerServiceProvider controllerServiceProvider;
+     private final EventDrivenWorkerQueue workerQueue;
+     private final ProcessContextFactory contextFactory;
+     private final AtomicInteger maxThreadCount;
+     private final StringEncryptor encryptor;
+ 
+     private volatile String adminYieldDuration = "1 sec";
+ 
+     private final ConcurrentMap<Connectable, AtomicLong> connectionIndexMap = new ConcurrentHashMap<>();
+     private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = new ConcurrentHashMap<>();
+ 
+     public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider flowController, final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) {
+         this.flowEngine = flowEngine;
+         this.controllerServiceProvider = flowController;
+         this.workerQueue = workerQueue;
+         this.contextFactory = contextFactory;
+         this.maxThreadCount = new AtomicInteger(maxThreadCount);
+         this.encryptor = encryptor;
+ 
+         for (int i = 0; i < maxThreadCount; i++) {
+             final Runnable eventDrivenTask = new EventDrivenTask(workerQueue);
+             flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 30000, TimeUnit.NANOSECONDS);
+         }
+     }
+ 
+     @Override
+     public void shutdown() {
+         flowEngine.shutdown();
+     }
+ 
+     @Override
+     public void schedule(final ReportingTaskNode taskNode, ScheduleState scheduleState) {
+         throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode");
+     }
+ 
+     @Override
+     public void unschedule(ReportingTaskNode taskNode, ScheduleState scheduleState) {
+         throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode");
+     }
+ 
+     @Override
+     public void schedule(final Connectable connectable, final ScheduleState scheduleState) {
+         workerQueue.resumeWork(connectable);
+         logger.info("Scheduled {} to run in Event-Driven mode", connectable);
+         scheduleStates.put(connectable, scheduleState);
+     }
+ 
+     @Override
+     public void unschedule(final Connectable connectable, final ScheduleState scheduleState) {
+         workerQueue.suspendWork(connectable);
+         logger.info("Stopped scheduling {} to run", connectable);
+     }
+ 
+     @Override
+     public void onEvent(final Connectable connectable) {
+         workerQueue.offer(connectable);
+     }
+ 
+     @Override
+     public void setMaxThreadCount(final int maxThreadCount) {
+         final int oldMax = this.maxThreadCount.getAndSet(maxThreadCount);
+         if (maxThreadCount > oldMax) {
+             // if more threads have been allocated, add more tasks to the work queue
+             final int tasksToAdd = maxThreadCount - oldMax;
+             for (int i = 0; i < tasksToAdd; i++) {
+                 final Runnable eventDrivenTask = new EventDrivenTask(workerQueue);
+                 flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 30000, TimeUnit.NANOSECONDS);
+             }
+         }
+     }
+ 
+     @Override
+     public void setAdministrativeYieldDuration(final String yieldDuration) {
+         this.adminYieldDuration = yieldDuration;
+     }
+ 
+     @Override
+     public String getAdministrativeYieldDuration() {
+         return adminYieldDuration;
+     }
+ 
+     @Override
+     public long getAdministrativeYieldDuration(final TimeUnit timeUnit) {
+         return FormatUtils.getTimeDuration(adminYieldDuration, timeUnit);
+     }
+ 
+     private class EventDrivenTask implements Runnable {
+ 
+         private final EventDrivenWorkerQueue workerQueue;
+ 
+         public EventDrivenTask(final EventDrivenWorkerQueue workerQueue) {
+             this.workerQueue = workerQueue;
+         }
+ 
+         @Override
+         public void run() {
+             while (!flowEngine.isShutdown()) {
+                 final EventBasedWorker worker = workerQueue.poll(1, TimeUnit.SECONDS);
+                 if (worker == null) {
+                     continue;
+                 }
+                 final Connectable connectable = worker.getConnectable();
+                 final ScheduleState scheduleState = scheduleStates.get(connectable);
+                 if (scheduleState == null) {
+                     // Component not yet scheduled to run but has received events
+                     continue;
+                 }
+ 
+                 // get the connection index for this worker
+                 AtomicLong connectionIndex = connectionIndexMap.get(connectable);
+                 if (connectionIndex == null) {
+                     connectionIndex = new AtomicLong(0L);
+                     final AtomicLong existingConnectionIndex = connectionIndexMap.putIfAbsent(connectable, connectionIndex);
+                     if (existingConnectionIndex != null) {
+                         connectionIndex = existingConnectionIndex;
+                     }
+                 }
+ 
+                 final ProcessContext context = contextFactory.newProcessContext(connectable, connectionIndex);
+ 
+                 if (connectable instanceof ProcessorNode) {
+                     final ProcessorNode procNode = (ProcessorNode) connectable;
+                     final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor);
+ 
+                     final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
+                     final ProcessSessionFactory sessionFactory;
+                     final StandardProcessSession rawSession;
+                     final boolean batch;
+                     if (procNode.isHighThroughputSupported() && runNanos > 0L) {
+                         rawSession = new StandardProcessSession(context);
+                         sessionFactory = new BatchingSessionFactory(rawSession);
+                         batch = true;
+                     } else {
+                         rawSession = null;
+                         sessionFactory = new StandardProcessSessionFactory(context);
+                         batch = false;
+                     }
+ 
+                     final long startNanos = System.nanoTime();
+                     final long finishNanos = startNanos + runNanos;
+                     int invocationCount = 0;
+                     boolean shouldRun = true;
+ 
+                     try {
+                         while (shouldRun) {
+                             trigger(procNode, context, scheduleState, standardProcessContext, sessionFactory);
+                             invocationCount++;
+ 
+                             if (!batch) {
+                                 break;
+                             }
+                             if (System.nanoTime() > finishNanos) {
+                                 break;
+                             }
+                             if (!scheduleState.isScheduled()) {
+                                 break;
+                             }
+ 
+                             final int eventCount = worker.decrementEventCount();
+                             if (eventCount < 0) {
+                                 worker.incrementEventCount();
+                             }
+                             shouldRun = (eventCount > 0);
+                         }
+                     } finally {
+                         if (batch && rawSession != null) {
+                             try {
+                                 rawSession.commit();
+                             } catch (final RuntimeException re) {
+                                 logger.error("Unable to commit process session", re);
+                             }
+                         }
+                         try {
+                             final long processingNanos = System.nanoTime() - startNanos;
+                             final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier());
+                             procEvent.setProcessingNanos(processingNanos);
+                             procEvent.setInvocations(invocationCount);
+                             context.getFlowFileEventRepository().updateRepository(procEvent);
+                         } catch (final IOException e) {
+                             logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString());
+                             logger.error("", e);
+                         }
+                     }
+ 
+                     // If the Processor has FlowFiles, go ahead and register it to run again.
+                     // We do this because it's possible (and fairly common) for a Processor to be triggered and then determine,
+                     // for whatever reason, that it is not ready to do anything and as a result simply returns without pulling anything
+                     // off of its input queue.
+                     // In this case, we could just say that the Processor shouldn't be Event-Driven, but then it becomes very complex and
+                     // confusing to determine whether or not a Processor is really Event-Driven. So, the solution that we will use at this
+                     // point is to register the Processor to run again.
+                     if (Connectables.flowFilesQueued(procNode)) {
+                         onEvent(procNode);
+                     }
+                 } else {
+                     final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context);
+                     final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor);
+                     trigger(connectable, scheduleState, connectableProcessContext, sessionFactory);
+ 
+                     // See explanation above for the ProcessorNode as to why we do this.
+                     if (Connectables.flowFilesQueued(connectable)) {
+                         onEvent(connectable);
+                     }
+                 }
+             }
+         }
+ 
++        @SuppressWarnings("deprecation")
+         private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) {
+             final int newThreadCount = scheduleState.incrementActiveThreadCount();
+             if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
+                  // its possible that the worker queue could give us a worker node that is eligible to run based
+                  // on the number of threads but another thread has already incremented the thread count, result in
+                  // reaching the maximum number of threads. we won't know this until we atomically increment the thread count 
+                  // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
+                  // result in using more than the maximum number of defined threads
+                  scheduleState.decrementActiveThreadCount();
+                  return;
+             }
+             
+             try {
+                 try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
+                     worker.onTrigger(processContext, sessionFactory);
+                 } catch (final ProcessException pe) {
+                     logger.error("{} failed to process session due to {}", worker, pe.toString());
+                 } catch (final Throwable t) {
+                     logger.error("{} failed to process session due to {}", worker, t.toString());
+                     logger.error("", t);
+ 
+                     logger.warn("{} Administratively Pausing for {} due to processing failure: {}", worker, getAdministrativeYieldDuration(), t.toString());
+                     logger.warn("", t);
+                     try {
+                         Thread.sleep(FormatUtils.getTimeDuration(adminYieldDuration, TimeUnit.MILLISECONDS));
+                     } catch (final InterruptedException e) {
+                     }
+ 
+                 }
+             } finally {
+                 if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
+                     try (final NarCloseable x = NarCloseable.withNarLoader()) {
 -                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker, processContext);
++                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, worker, processContext);
+                     }
+                 }
+ 
+                 scheduleState.decrementActiveThreadCount();
+             }
+         }
+ 
+         private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) {
+             final int newThreadCount = scheduleState.incrementActiveThreadCount();
+             if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
+                  // its possible that the worker queue could give us a worker node that is eligible to run based
+                  // on the number of threads but another thread has already incremented the thread count, result in
+                  // reaching the maximum number of threads. we won't know this until we atomically increment the thread count 
+                  // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
+                  // result in using more than the maximum number of defined threads
+                  scheduleState.decrementActiveThreadCount();
+                  return;
+             }
+             
+             try {
+                 try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
+                     worker.onTrigger(processContext, sessionFactory);
+                 } catch (final ProcessException pe) {
+                     final ProcessorLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor());
+                     procLog.error("Failed to process session due to {}", new Object[]{pe});
+                 } catch (final Throwable t) {
+                     // Use ProcessorLog to log the event so that a bulletin will be created for this processor
+                     final ProcessorLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor());
+                     procLog.error("{} failed to process session due to {}", new Object[]{worker.getProcessor(), t});
+                     procLog.warn("Processor Administratively Yielded for {} due to processing failure", new Object[]{adminYieldDuration});
+                     logger.warn("Administratively Yielding {} due to uncaught Exception: ", worker.getProcessor());
+                     logger.warn("", t);
+ 
+                     worker.yield(FormatUtils.getTimeDuration(adminYieldDuration, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+                 }
+             } finally {
+                 // if the processor is no longer scheduled to run and this is the last thread,
+                 // invoke the OnStopped methods
+                 if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
+                     try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                         ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker.getProcessor(), processContext);
+                     }
+                 }
+ 
+                 scheduleState.decrementActiveThreadCount();
+             }
+         }
+     }
+ }