You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/03/24 15:11:01 UTC

[14/17] nifi git commit: NIFI-3380 Bumping NAR plugin to 1.2.0-SNAPSHOT development to leverage changes from master, adding buildnumber-maven-plugin to nifi-nar-bundles to properly set build info in MANIFEST of NARs - Refactoring NarDetails to include al

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 191fc65..f312096 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -35,6 +35,8 @@ import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.DataAuthorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.node.ClusterRoles;
@@ -63,6 +65,7 @@ import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater;
 import org.apache.nifi.controller.cluster.Heartbeater;
 import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.label.StandardLabel;
@@ -105,6 +108,7 @@ import org.apache.nifi.controller.serialization.FlowSerializationException;
 import org.apache.nifi.controller.serialization.FlowSerializer;
 import org.apache.nifi.controller.serialization.FlowSynchronizationException;
 import org.apache.nifi.controller.serialization.FlowSynchronizer;
+import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
@@ -150,6 +154,7 @@ import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.processor.StandardProcessContext;
 import org.apache.nifi.processor.StandardProcessorInitializationContext;
 import org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.provenance.IdentifierLookup;
@@ -184,11 +189,13 @@ import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.stream.io.LimitingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.util.ComponentIdGenerator;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
@@ -218,6 +225,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -279,6 +287,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final Set<RemoteSiteListener> externalSiteListeners = new HashSet<>();
     private final AtomicReference<CounterRepository> counterRepositoryRef;
     private final AtomicBoolean initialized = new AtomicBoolean(false);
+    private final AtomicBoolean flowSynchronized = new AtomicBoolean(false);
     private final StandardControllerServiceProvider controllerServiceProvider;
     private final Authorizer authorizer;
     private final AuditService auditService;
@@ -1012,13 +1021,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      *
      * @param type processor type
      * @param id processor id
+     * @param coordinate the coordinate of the bundle for this processor
      * @return new processor
      * @throws NullPointerException if either arg is null
      * @throws ProcessorInstantiationException if the processor cannot be
      * instantiated for any reason
      */
-    public ProcessorNode createProcessor(final String type, final String id) throws ProcessorInstantiationException {
-        return createProcessor(type, id, true);
+    public ProcessorNode createProcessor(final String type, final String id, final BundleCoordinate coordinate) throws ProcessorInstantiationException {
+        return createProcessor(type, id, coordinate, true);
     }
 
     /**
@@ -1029,6 +1039,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      *
      * @param type the fully qualified Processor class name
      * @param id the unique ID of the Processor
+     * @param coordinate the bundle coordinate for this processor
      * @param firstTimeAdded whether or not this is the first time this
      * Processor is added to the graph. If {@code true}, will invoke methods
      * annotated with the {@link OnAdded} annotation.
@@ -1037,39 +1048,63 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @throws ProcessorInstantiationException if the processor cannot be
      * instantiated for any reason
      */
-    public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException {
+    public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final boolean firstTimeAdded) throws ProcessorInstantiationException {
+        return createProcessor(type, id, coordinate, firstTimeAdded, true);
+    }
+
+    /**
+     * <p>
+     * Creates a new ProcessorNode with the given type and identifier and
+     * optionally initializes it.
+     * </p>
+     *
+     * @param type the fully qualified Processor class name
+     * @param id the unique ID of the Processor
+     * @param coordinate the bundle coordinate for this processor
+     * @param firstTimeAdded whether or not this is the first time this
+     * Processor is added to the graph. If {@code true}, will invoke methods
+     * annotated with the {@link OnAdded} annotation.
+     * @return new processor node
+     * @throws NullPointerException if either arg is null
+     * @throws ProcessorInstantiationException if the processor cannot be
+     * instantiated for any reason
+     */
+    public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final boolean firstTimeAdded, final boolean registerLogObserver)
+            throws ProcessorInstantiationException {
         id = id.intern();
 
         boolean creationSuccessful;
-        Processor processor;
+        LoggableComponent<Processor> processor;
         try {
-            processor = instantiateProcessor(type, id);
+            processor = instantiateProcessor(type, id, coordinate);
             creationSuccessful = true;
         } catch (final ProcessorInstantiationException pie) {
             LOG.error("Could not create Processor of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", pie);
             final GhostProcessor ghostProc = new GhostProcessor();
             ghostProc.setIdentifier(id);
             ghostProc.setCanonicalClassName(type);
-            processor = ghostProc;
+            processor = new LoggableComponent<>(ghostProc, coordinate, null);
             creationSuccessful = false;
         }
 
-        final ComponentLog logger = new SimpleProcessLogger(id, processor);
         final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry);
         final ProcessorNode procNode;
         if (creationSuccessful) {
-            procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties, variableRegistry, logger);
+            procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties, variableRegistry);
         } else {
             final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
             final String componentType = "(Missing) " + simpleClassName;
-            procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties, variableRegistry, logger);
+            procNode = new StandardProcessorNode(
+                    processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties, variableRegistry, true);
         }
 
         final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
-        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
+        if (registerLogObserver) {
+            logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
+        }
 
         try {
-            final Class<?> procClass = processor.getClass();
+            final Class<?> procClass = procNode.getProcessor().getClass();
             if(procClass.isAnnotationPresent(DefaultSettings.class)) {
                 DefaultSettings ds = procClass.getAnnotation(DefaultSettings.class);
                 try {
@@ -1083,27 +1118,33 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 } catch(Throwable ex) {
                     LOG.error(String.format("Error while setting penalty duration from DefaultSettings annotation:%s",ex.getMessage()),ex);
                 }
-                try {
-                    procNode.setBulletinLevel(ds.bulletinLevel());
-                } catch (Throwable ex) {
-                    LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s",ex.getMessage()),ex);
-                }
 
+                // calling setBulletinLevel changes the level in the LogRepository so we only want to do this when
+                // the caller said to register the log observer, otherwise we could be changing the level when we didn't mean to
+                if (registerLogObserver) {
+                    try {
+                        procNode.setBulletinLevel(ds.bulletinLevel());
+                    } catch (Throwable ex) {
+                        LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s", ex.getMessage()), ex);
+                    }
+                }
             }
         } catch (Throwable ex) {
             LOG.error(String.format("Error while setting default settings from DefaultSettings annotation: %s",ex.getMessage()),ex);
         }
 
         if (firstTimeAdded) {
-            try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
-                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
+            try (final NarCloseable x = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, procNode.getProcessor());
             } catch (final Exception e) {
-                logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
+                if (registerLogObserver) {
+                    logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
+                }
                 throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
             }
 
             if (firstTimeAdded) {
-                try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), processor.getIdentifier())) {
+                try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
                     ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
                 }
             }
@@ -1112,30 +1153,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return procNode;
     }
 
-    private Processor instantiateProcessor(final String type, final String identifier) throws ProcessorInstantiationException {
-        Processor processor;
+    private LoggableComponent<Processor> instantiateProcessor(final String type, final String identifier, final BundleCoordinate bundleCoordinate) throws ProcessorInstantiationException {
+        final Bundle processorBundle = ExtensionManager.getBundle(bundleCoordinate);
+        if (processorBundle == null) {
+            throw new ProcessorInstantiationException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
+        }
 
         final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
         try {
-            final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type, identifier);
-            final Class<?> rawClass;
-            if (detectedClassLoaderForType == null) {
-                // try to find from the current class loader
-                rawClass = Class.forName(type);
-            } else {
-                // try to find from the registered classloader for that type
-                rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type, identifier));
-            }
-
+            final ClassLoader detectedClassLoaderForType = ExtensionManager.createInstanceClassLoader(type, identifier, processorBundle);
+            final Class<?> rawClass = Class.forName(type, true, processorBundle.getClassLoader());
             Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+
             final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class);
-            processor = processorClass.newInstance();
+            final Processor processor = processorClass.newInstance();
+
             final ComponentLog componentLogger = new SimpleProcessLogger(identifier, processor);
             final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, componentLogger, this, this, nifiProperties);
             processor.initialize(ctx);
 
             LogRepositoryFactory.getRepository(identifier).setLogger(componentLogger);
-            return processor;
+
+            return new LoggableComponent<>(processor, bundleCoordinate, componentLogger);
         } catch (final Throwable t) {
             throw new ProcessorInstantiationException(type, t);
         } finally {
@@ -1145,6 +1184,34 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
+    public void changeProcessorType(final ProcessorNode existingNode, final String newType, final BundleCoordinate bundleCoordinate) throws ProcessorInstantiationException {
+        if (existingNode == null) {
+            throw new IllegalStateException("Existing ProcessorNode cannot be null");
+        }
+
+        final String id = existingNode.getProcessor().getIdentifier();
+
+        // createProcessor will create a new instance class loader for the same id so
+        // save the instance class loader to use it for calling OnRemoved on the existing processor
+        final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
+
+        // create a new node with firstTimeAdded as true so lifecycle methods get fired
+        // attempt the creation to make sure it works before firing the OnRemoved methods below
+        final ProcessorNode newNode = createProcessor(newType, id, bundleCoordinate, true, false);
+
+        // call OnRemoved for the existing processor using the previous instance class loader
+        try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+            final StandardProcessContext processContext = new StandardProcessContext(
+                    existingNode, controllerServiceProvider, encryptor, getStateManagerProvider().getStateManager(id), variableRegistry);
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
+        }
+
+        // set the new processor in the existing node
+        final LoggableComponent<Processor> newProcessor = new LoggableComponent<>(newNode.getProcessor(), newNode.getBundleCoordinate(), newNode.getLogger());
+        existingNode.setProcessor(newProcessor);
+        existingNode.setExtensionMissing(newNode.isExtensionMissing());
+    }
+
     /**
      * @return the ExtensionManager used for instantiating Processors,
      * Prioritizers, etc.
@@ -1459,13 +1526,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @throws FlowSynchronizationException if updates to the controller failed.
      * If this exception is thrown, then the controller should be considered
      * unsafe to be used
+     * @throws MissingBundleException if the proposed flow cannot be loaded by the
+     * controller because it contains a bundle that does not exist in the controller
      */
     public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow)
-            throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
+            throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
         writeLock.lock();
         try {
             LOG.debug("Synchronizing controller with proposed flow");
             synchronizer.sync(this, dataFlow, encryptor);
+            flowSynchronized.set(true);
             LOG.info("Successfully synchronized controller with proposed flow");
         } finally {
             writeLock.unlock();
@@ -1630,7 +1700,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             // Instantiate Controller Services
             //
             for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
-                final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), true);
+                final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
+                final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate, true);
 
                 serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
                 serviceNode.setComments(controllerServiceDTO.getComments());
@@ -1717,7 +1788,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             // Instantiate the processors
             //
             for (final ProcessorDTO processorDTO : dto.getProcessors()) {
-                final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId());
+                final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(processorDTO.getType(), processorDTO.getBundle());
+                final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate);
 
                 procNode.setPosition(toPosition(processorDTO.getPosition()));
                 procNode.setProcessGroup(group);
@@ -1953,45 +2025,83 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
+    private void verifyBundleInSnippet(final BundleDTO requiredBundle, final Set<BundleCoordinate> supportedBundles) {
+        final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
+        if (!supportedBundles.contains(requiredCoordinate)) {
+            throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
+        }
+    }
+
+    private void verifyProcessorsInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+        if (templateContents.getProcessors() != null) {
+            templateContents.getProcessors().forEach(processor -> {
+                if (processor.getBundle() == null) {
+                    throw new IllegalArgumentException("Processor bundle must be specified.");
+                }
+
+                if (supportedTypes.containsKey(processor.getType())) {
+                    verifyBundleInSnippet(processor.getBundle(), supportedTypes.get(processor.getType()));
+                } else {
+                    throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
+                }
+            });
+        }
+
+        if (templateContents.getProcessGroups() != null) {
+            templateContents.getProcessGroups().forEach(processGroup -> {
+                verifyProcessorsInSnippet(processGroup.getContents(), supportedTypes);
+            });
+        }
+    }
+
+    private void verifyControllerServicesInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+        if (templateContents.getControllerServices() != null) {
+            templateContents.getControllerServices().forEach(controllerService -> {
+                if (supportedTypes.containsKey(controllerService.getType())) {
+                    if (controllerService.getBundle() == null) {
+                        throw new IllegalArgumentException("Controller Service bundle must be specified.");
+                    }
+
+                    verifyBundleInSnippet(controllerService.getBundle(), supportedTypes.get(controllerService.getType()));
+                } else {
+                    throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
+                }
+            });
+        }
+
+        if (templateContents.getProcessGroups() != null) {
+            templateContents.getProcessGroups().forEach(processGroup -> {
+                verifyControllerServicesInSnippet(processGroup.getContents(), supportedTypes);
+            });
+        }
+    }
+
     public void verifyComponentTypesInSnippet(final FlowSnippetDTO templateContents) {
-        // validate that all Processor Types and Prioritizer Types are valid
-        final Set<String> processorClasses = new HashSet<>();
+        final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
         for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
-            processorClasses.add(c.getName());
+            final String name = c.getName();
+            processorClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
         }
+        verifyProcessorsInSnippet(templateContents, processorClasses);
+
+        final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>();
+        for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) {
+            final String name = c.getName();
+            controllerServiceClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+        }
+        verifyControllerServicesInSnippet(templateContents, controllerServiceClasses);
+
         final Set<String> prioritizerClasses = new HashSet<>();
         for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
             prioritizerClasses.add(c.getName());
         }
-        final Set<String> controllerServiceClasses = new HashSet<>();
-        for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) {
-            controllerServiceClasses.add(c.getName());
-        }
 
-        final Set<ProcessorDTO> allProcs = new HashSet<>();
         final Set<ConnectionDTO> allConns = new HashSet<>();
-        allProcs.addAll(templateContents.getProcessors());
         allConns.addAll(templateContents.getConnections());
         for (final ProcessGroupDTO childGroup : templateContents.getProcessGroups()) {
-            allProcs.addAll(findAllProcessors(childGroup));
             allConns.addAll(findAllConnections(childGroup));
         }
 
-        for (final ProcessorDTO proc : allProcs) {
-            if (!processorClasses.contains(proc.getType())) {
-                throw new IllegalStateException("Invalid Processor Type: " + proc.getType());
-            }
-        }
-
-        final Set<ControllerServiceDTO> controllerServices = templateContents.getControllerServices();
-        if (controllerServices != null) {
-            for (final ControllerServiceDTO service : controllerServices) {
-                if (!controllerServiceClasses.contains(service.getType())) {
-                    throw new IllegalStateException("Invalid Controller Service Type: " + service.getType());
-                }
-            }
-        }
-
         for (final ConnectionDTO conn : allConns) {
             final List<String> prioritizers = conn.getPrioritizers();
             if (prioritizers != null) {
@@ -2047,24 +2157,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     /**
-     * Recursively finds all ProcessorDTO's
-     *
-     * @param group group
-     * @return processor dto set
-     */
-    private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) {
-        final Set<ProcessorDTO> procs = new HashSet<>();
-        for (final ProcessorDTO dto : group.getContents().getProcessors()) {
-            procs.add(dto);
-        }
-
-        for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) {
-            procs.addAll(findAllProcessors(childGroup));
-        }
-        return procs;
-    }
-
-    /**
      * Recursively finds all ConnectionDTO's
      *
      * @param group group
@@ -2110,16 +2202,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
         try {
-            final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type);
-            final Class<?> rawClass;
-            if (detectedClassLoaderForType == null) {
-                // try to find from the current class loader
-                rawClass = Class.forName(type);
-            } else {
-                // try to find from the registered classloader for that type
-                rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type));
+            final List<Bundle> prioritizerBundles = ExtensionManager.getBundles(type);
+            if (prioritizerBundles.size() == 0) {
+                throw new IllegalStateException(String.format("The specified class '%s' is not known to this nifi.", type));
+            }
+            if (prioritizerBundles.size() > 1) {
+                throw new IllegalStateException(String.format("Multiple bundles found for the specified class '%s', only one is allowed.", type));
             }
 
+            final Bundle bundle = prioritizerBundles.get(0);
+            final ClassLoader detectedClassLoaderForType = bundle.getClassLoader();
+            final Class<?> rawClass = Class.forName(type, true, detectedClassLoaderForType);
+
             Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
             final Class<? extends FlowFilePrioritizer> prioritizerClass = rawClass.asSubclass(FlowFilePrioritizer.class);
             final Object processorObj = prioritizerClass.newInstance();
@@ -2772,6 +2866,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return initialized.get();
     }
 
+    public boolean isFlowSynchronized() {
+        return flowSynchronized.get();
+    }
+
     public void startConnectable(final Connectable connectable) {
         final ProcessGroup group = requireNonNull(connectable).getProcessGroup();
 
@@ -2835,83 +2933,66 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         lookupGroup(groupId).stopProcessing();
     }
 
-    public ReportingTaskNode createReportingTask(final String type) throws ReportingTaskInstantiationException {
-        return createReportingTask(type, true);
+    public ReportingTaskNode createReportingTask(final String type, final BundleCoordinate bundleCoordinate) throws ReportingTaskInstantiationException {
+        return createReportingTask(type, bundleCoordinate, true);
     }
 
-    public ReportingTaskNode createReportingTask(final String type, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
-        return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded);
+    public ReportingTaskNode createReportingTask(final String type, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+        return createReportingTask(type, UUID.randomUUID().toString(), bundleCoordinate, firstTimeAdded);
     }
 
     @Override
-    public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
-        return createReportingTask(type, id, firstTimeAdded, true);
+    public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate,final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+        return createReportingTask(type, id, bundleCoordinate, firstTimeAdded, true);
     }
 
-    public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded, final boolean register) throws ReportingTaskInstantiationException {
-        if (type == null || id == null) {
+    public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded, final boolean register)
+            throws ReportingTaskInstantiationException {
+        if (type == null || id == null || bundleCoordinate == null) {
             throw new NullPointerException();
         }
 
-        ReportingTask task = null;
+        LoggableComponent<ReportingTask> task = null;
         boolean creationSuccessful = true;
-        final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
         try {
-            final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type, id);
-            final Class<?> rawClass;
-            if (detectedClassLoader == null) {
-                rawClass = Class.forName(type);
-            } else {
-                rawClass = Class.forName(type, false, detectedClassLoader);
-            }
-
-            Thread.currentThread().setContextClassLoader(detectedClassLoader);
-            final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class);
-            final Object reportingTaskObj = reportingTaskClass.newInstance();
-            task = reportingTaskClass.cast(reportingTaskObj);
+            task = instantiateReportingTask(type, id, bundleCoordinate);
         } catch (final Exception e) {
             LOG.error("Could not create Reporting Task of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", e);
             final GhostReportingTask ghostTask = new GhostReportingTask();
             ghostTask.setIdentifier(id);
             ghostTask.setCanonicalClassName(type);
-            task = ghostTask;
+            task = new LoggableComponent<>(ghostTask, bundleCoordinate, null);
             creationSuccessful = false;
-        } finally {
-            if (ctxClassLoader != null) {
-                Thread.currentThread().setContextClassLoader(ctxClassLoader);
-            }
         }
 
-        final ComponentLog logger = new SimpleProcessLogger(id, task);
         final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry);
         final ReportingTaskNode taskNode;
         if (creationSuccessful) {
-            taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry, logger);
+            taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry);
         } else {
             final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
             final String componentType = "(Missing) " + simpleClassName;
 
-            taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry, logger);
+            taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry, true);
         }
 
-        taskNode.setName(task.getClass().getSimpleName());
+        taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName());
 
         if (firstTimeAdded) {
-            final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask());
             final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
-                    SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this, nifiProperties);
+                    SchedulingStrategy.TIMER_DRIVEN, "1 min", taskNode.getLogger(), this, nifiProperties);
 
             try {
-                task.initialize(config);
+                taskNode.getReportingTask().initialize(config);
             } catch (final InitializationException ie) {
                 throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie);
             }
 
             try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getReportingTask().getIdentifier())) {
-                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask());
                 ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask());
             } catch (final Exception e) {
-                throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
+                throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + taskNode.getReportingTask(), e);
             }
         }
 
@@ -2927,6 +3008,63 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return taskNode;
     }
 
+    private LoggableComponent<ReportingTask> instantiateReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate)
+            throws ReportingTaskInstantiationException {
+
+        final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            final Bundle reportingTaskBundle = ExtensionManager.getBundle(bundleCoordinate);
+            if (reportingTaskBundle == null) {
+                throw new IllegalStateException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
+            }
+
+            final ClassLoader detectedClassLoader = ExtensionManager.createInstanceClassLoader(type, id, reportingTaskBundle);
+            final Class<?> rawClass = Class.forName(type, false, detectedClassLoader);
+            Thread.currentThread().setContextClassLoader(detectedClassLoader);
+
+            final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class);
+            final Object reportingTaskObj = reportingTaskClass.newInstance();
+
+            final ReportingTask reportingTask = reportingTaskClass.cast(reportingTaskObj);
+            final ComponentLog componentLog = new SimpleProcessLogger(id, reportingTask);
+
+            return new LoggableComponent<>(reportingTask, bundleCoordinate, componentLog);
+        } catch (final Exception e) {
+            throw new ReportingTaskInstantiationException(type, e);
+        } finally {
+            if (ctxClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(ctxClassLoader);
+            }
+        }
+    }
+
+    @Override
+    public void changeReportingTaskType(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate) throws ReportingTaskInstantiationException {
+        if (existingNode == null) {
+            throw new IllegalStateException("Existing ReportingTaskNode cannot be null");
+        }
+
+        final String id = existingNode.getReportingTask().getIdentifier();
+
+        // createReportingTask will create a new instance class loader for the same id so
+        // save the instance class loader to use it for calling OnRemoved on the existing processor
+        final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
+
+        // set firstTimeAdded to true so lifecycle annotations get fired, but don't register this node
+        // attempt the creation to make sure it works before firing the OnRemoved methods below
+        final ReportingTaskNode newNode = createReportingTask(newType, id, bundleCoordinate, true, false);
+
+        // call OnRemoved for the existing reporting task using the previous instance class loader
+        try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getReportingTask(), existingNode.getConfigurationContext());
+        }
+
+        // set the new reporting task into the existing node
+        final LoggableComponent<ReportingTask> newReportingTask = new LoggableComponent<>(newNode.getReportingTask(), newNode.getBundleCoordinate(), newNode.getLogger());
+        existingNode.setReportingTask(newReportingTask);
+        existingNode.setExtensionMissing(newNode.isExtensionMissing());
+    }
+
     @Override
     public ReportingTaskNode getReportingTaskNode(final String taskId) {
         return reportingTasks.get(taskId);
@@ -2988,8 +3126,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     @Override
-    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
-        final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+    public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) {
+        final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, bundleCoordinate, firstTimeAdded);
 
         // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
         final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
@@ -3007,6 +3145,42 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return serviceNode;
     }
 
+    public void changeControllerServiceType(final ControllerServiceNode existingNode, final String newType, final BundleCoordinate bundleCoordinate)
+            throws ControllerServiceInstantiationException {
+        if (existingNode == null) {
+            throw new IllegalStateException("Existing ControllerServiceNode cannot be null");
+        }
+
+        final String id = existingNode.getIdentifier();
+
+        // createControllerService will create a new instance class loader for the same id so
+        // save the instance class loader to use it for calling OnRemoved on the existing service
+        final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
+
+        // create a new node with firstTimeAdded as true so lifecycle methods get called
+        // attempt the creation to make sure it works before firing the OnRemoved methods below
+        final ControllerServiceNode newNode = controllerServiceProvider.createControllerService(newType, id, bundleCoordinate, true);
+
+        // call OnRemoved for the existing service using the previous instance class loader
+        try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+            final ConfigurationContext configurationContext = new StandardConfigurationContext(existingNode, controllerServiceProvider, null, variableRegistry);
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getControllerServiceImplementation(), configurationContext);
+        }
+
+        // take the invocation handler that was created for new proxy and is set to look at the new node,
+        // and set it to look at the existing node
+        final ControllerServiceInvocationHandler invocationHandler = newNode.getInvocationHandler();
+        invocationHandler.setServiceNode(existingNode);
+
+        // create LoggableComponents for the proxy and implementation
+        final LoggableComponent<ControllerService> loggableProxy = new LoggableComponent<>(newNode.getProxiedControllerService(), bundleCoordinate, newNode.getLogger());
+        final LoggableComponent<ControllerService> loggableImplementation = new LoggableComponent<>(newNode.getControllerServiceImplementation(), bundleCoordinate, newNode.getLogger());
+
+        // set the new impl, proxy, and invocation handler into the existing node
+        existingNode.setControllerServiceAndProxy(loggableImplementation, loggableProxy, invocationHandler);
+        existingNode.setExtensionMissing(newNode.isExtensionMissing());
+    }
+
     @Override
     public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
         reportingTaskNode.verifyCanEnable();

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java
new file mode 100644
index 0000000..175f252
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.cluster.ConnectionException;
+
+/**
+ * Represents the exceptional case when a node fails to join the cluster because a bundle being used by the cluster does not exist on the node.
+ */
+public class MissingBundleException extends ConnectionException {
+
+    private static final long serialVersionUID = 198234798234794L;
+
+    public MissingBundleException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+
+    public MissingBundleException(Throwable cause) {
+        super(cause);
+    }
+
+    public MissingBundleException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MissingBundleException(String message) {
+        super(message);
+    }
+
+    public MissingBundleException() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
new file mode 100644
index 0000000..687d5c4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.Processor;
+
+/**
+ * Holder for StandardProcessorNode to atomically swap out the component.
+ */
+public class ProcessorDetails {
+
+    private final Processor processor;
+    private final Class<?> procClass;
+    private final boolean triggerWhenEmpty;
+    private final boolean sideEffectFree;
+    private final boolean triggeredSerially;
+    private final boolean triggerWhenAnyDestinationAvailable;
+    private final boolean eventDrivenSupported;
+    private final boolean batchSupported;
+    private final InputRequirement.Requirement inputRequirement;
+    private final ComponentLog componentLog;
+    private final BundleCoordinate bundleCoordinate;
+
+    public ProcessorDetails(final LoggableComponent<Processor> processor) {
+        this.processor = processor.getComponent();
+        this.componentLog = processor.getLogger();
+        this.bundleCoordinate = processor.getBundleCoordinate();
+
+        this.procClass = this.processor.getClass();
+        this.triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class);
+        this.sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class);
+        this.batchSupported = procClass.isAnnotationPresent(SupportsBatching.class);
+        this.triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class);
+        this.triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class);
+        this.eventDrivenSupported = procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty;
+
+        final boolean inputRequirementPresent = procClass.isAnnotationPresent(InputRequirement.class);
+        if (inputRequirementPresent) {
+            this.inputRequirement = procClass.getAnnotation(InputRequirement.class).value();
+        } else {
+            this.inputRequirement = InputRequirement.Requirement.INPUT_ALLOWED;
+        }
+    }
+
+    public Processor getProcessor() {
+        return processor;
+    }
+
+    public Class<?> getProcClass() {
+        return procClass;
+    }
+
+    public boolean isTriggerWhenEmpty() {
+        return triggerWhenEmpty;
+    }
+
+    public boolean isSideEffectFree() {
+        return sideEffectFree;
+    }
+
+    public boolean isTriggeredSerially() {
+        return triggeredSerially;
+    }
+
+    public boolean isTriggerWhenAnyDestinationAvailable() {
+        return triggerWhenAnyDestinationAvailable;
+    }
+
+    public boolean isEventDrivenSupported() {
+        return eventDrivenSupported;
+    }
+
+    public boolean isBatchSupported() {
+        return batchSupported;
+    }
+
+    public InputRequirement.Requirement getInputRequirement() {
+        return inputRequirement;
+    }
+
+    public ComponentLog getComponentLog() {
+        return componentLog;
+    }
+
+    public BundleCoordinate getBundleCoordinate() {
+        return bundleCoordinate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 2d35a63..0ce6742 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -16,39 +16,10 @@
  */
 package org.apache.nifi.controller;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
 import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.cluster.ConnectionException;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
@@ -94,6 +65,38 @@ import org.apache.nifi.web.revision.RevisionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
 public class StandardFlowService implements FlowService, ProtocolHandler {
 
     private static final String EVENT_CATEGORY = "Controller";
@@ -135,11 +138,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
      */
     private NodeIdentifier nodeId;
 
-    private final NiFiProperties nifiProperties;
-
     // guardedBy rwLock
     private boolean firstControllerInitialization = true;
 
+    private final NiFiProperties nifiProperties;
+
     private static final String CONNECTION_EXCEPTION_MSG_PREFIX = "Failed to connect node to cluster because ";
     private static final Logger logger = LoggerFactory.getLogger(StandardFlowService.class);
 
@@ -434,9 +437,28 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
     }
 
     @Override
-    public void load(final DataFlow dataFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
+    public void load(final DataFlow dataFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
         if (configuredForClustering) {
-            final DataFlow proposedFlow = (dataFlow == null) ? createDataFlow() : dataFlow;
+            // Create the initial flow from disk if it exists, or from serializing the empty root group in flow controller
+            final DataFlow initialFlow = (dataFlow == null) ? createDataFlow() : dataFlow;
+            if (logger.isTraceEnabled()) {
+                logger.trace("InitialFlow = " + new String(initialFlow.getFlow(), StandardCharsets.UTF_8));
+            }
+
+            // Sync the initial flow into the flow controller so that if the flow came from disk we loaded the
+            // whole flow into the flow controller and applied any bundle upgrades
+            writeLock.lock();
+            try {
+                loadFromBytes(initialFlow, true);
+            } finally {
+                writeLock.unlock();
+            }
+
+            // Get the proposed flow by serializing the flow controller which now has the synced version from above
+            final DataFlow proposedFlow = createDataFlowFromController();
+            if (logger.isTraceEnabled()) {
+                logger.trace("ProposedFlow = " + new String(proposedFlow.getFlow(), StandardCharsets.UTF_8));
+            }
 
             /*
              * Attempt to connect to the cluster. If the manager is able to
@@ -457,9 +479,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
                 if (response == null || response.shouldTryLater()) {
                     logger.info("Flow controller will load local dataflow and suspend connection handshake until a cluster connection response is received.");
 
-                    // load local proposed flow
-                    loadFromBytes(proposedFlow, false);
-
                     // set node ID on controller before we start heartbeating because heartbeat needs node ID
                     controller.setNodeId(nodeId);
                     clusterCoordinator.setLocalNodeIdentifier(nodeId);
@@ -479,6 +498,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
                      */
                     controller.startHeartbeating();
 
+                    // Initialize the controller after the flow is loaded so we don't take any actions on repos until everything is good
+                    initializeController();
+
                     // notify controller that flow is initialized
                     try {
                         controller.onFlowInitialized(autoResumeState);
@@ -491,21 +513,26 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
                 } else {
                     try {
                         loadFromConnectionResponse(response);
-                        dao.save(controller, true);
                     } catch (final Exception e) {
                         logger.error("Failed to load flow from cluster due to: " + e, e);
                         handleConnectionFailure(e);
                         throw new IOException(e);
                     }
                 }
+
+                // save the flow in the controller so we write out the latest flow with any updated bundles to disk
+                dao.save(controller, true);
+
             } finally {
                 writeLock.unlock();
             }
         } else {
             writeLock.lock();
             try {
-                // operating in standalone mode, so load proposed flow
+                // operating in standalone mode, so load proposed flow and initialize the controller
                 loadFromBytes(dataFlow, true);
+                initializeController();
+                dao.save(controller, true);
             } finally {
                 writeLock.unlock();
             }
@@ -516,6 +543,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         DisconnectionCode disconnectionCode;
         if (ex instanceof UninheritableFlowException) {
             disconnectionCode = DisconnectionCode.MISMATCHED_FLOWS;
+        } else if (ex instanceof MissingBundleException) {
+            disconnectionCode = DisconnectionCode.MISSING_BUNDLE;
         } else if (ex instanceof FlowSynchronizationException) {
             disconnectionCode = DisconnectionCode.MISMATCHED_FLOWS;
         } else {
@@ -533,7 +562,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
             // create the response
             final FlowResponseMessage response = new FlowResponseMessage();
-            response.setDataFlow(createDataFlow());
+            response.setDataFlow(createDataFlowFromController());
             return response;
         } catch (final Exception ex) {
             throw new ProtocolException("Failed serializing flow controller state for flow request due to: " + ex, ex);
@@ -549,15 +578,15 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
     @Override
     public StandardDataFlow createDataFlow() throws IOException {
-        final byte[] snippetBytes = controller.getSnippetManager().export();
-        final byte[] authorizerFingerprint = getAuthorizerFingerprint();
-
         // Load the flow from disk if the file exists.
         if (dao.isFlowPresent()) {
             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
             dao.load(baos);
             final byte[] bytes = baos.toByteArray();
-            final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint);
+
+            final byte[] snippetBytes = controller.getSnippetManager().export();
+            final byte[] authorizerFingerprint = getAuthorizerFingerprint();
+            final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint, new HashSet<>());
             return fromDisk;
         }
 
@@ -566,14 +595,28 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         // will automatically create a Root Process Group, and we need to ensure that
         // we replicate that Process Group to all nodes in the cluster, so that they all
         // end up with the same ID for the root Process Group.
+        return createDataFlowFromController();
+    }
+
+    @Override
+    public StandardDataFlow createDataFlowFromController() throws IOException {
+        final byte[] snippetBytes = controller.getSnippetManager().export();
+        final byte[] authorizerFingerprint = getAuthorizerFingerprint();
+
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         dao.save(controller, baos);
         final byte[] flowBytes = baos.toByteArray();
         baos.reset();
 
-        return new StandardDataFlow(flowBytes, snippetBytes, authorizerFingerprint);
+        final Set<String> missingComponents = new HashSet<>();
+        controller.getRootGroup().findAllProcessors().stream().filter(p -> p.isExtensionMissing()).forEach(p -> missingComponents.add(p.getIdentifier()));
+        controller.getAllControllerServices().stream().filter(cs -> cs.isExtensionMissing()).forEach(cs -> missingComponents.add(cs.getIdentifier()));
+        controller.getAllReportingTasks().stream().filter(r -> r.isExtensionMissing()).forEach(r -> missingComponents.add(r.getIdentifier()));
+
+        return new StandardDataFlow(flowBytes, snippetBytes, authorizerFingerprint, missingComponents);
     }
 
+
     private NodeIdentifier getNodeId() {
         readLock.lock();
         try {
@@ -593,7 +636,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
             if (connectionResponse.getDataFlow() == null) {
                 logger.info("Received a Reconnection Request that contained no DataFlow. Will attempt to connect to cluster using local flow.");
-                connectionResponse = connect(false, false, createDataFlow());
+                connectionResponse = connect(false, false, createDataFlowFromController());
             }
 
             loadFromConnectionResponse(connectionResponse);
@@ -623,7 +666,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         writeLock.lock();
         try {
 
-            logger.info("Disconnecting node.");
+            logger.info("Disconnecting node due to " + explanation);
 
             // mark node as not connected
             controller.setConnectionStatus(new NodeConnectionStatus(nodeId, DisconnectionCode.UNKNOWN, explanation));
@@ -638,7 +681,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             controller.setClustered(false, null);
             clusterCoordinator.setConnected(false);
 
-            logger.info("Node disconnected.");
+            logger.info("Node disconnected due to " + explanation);
 
         } finally {
             writeLock.unlock();
@@ -647,31 +690,30 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
     // write lock must already be acquired
     private void loadFromBytes(final DataFlow proposedFlow, final boolean allowEmptyFlow)
-            throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
+            throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
         logger.trace("Loading flow from bytes");
 
         // resolve the given flow (null means load flow from disk)
         final DataFlow actualProposedFlow;
         final byte[] flowBytes;
         final byte[] authorizerFingerprint;
+        final Set<String> missingComponents;
+
         if (proposedFlow == null) {
             final ByteArrayOutputStream flowOnDisk = new ByteArrayOutputStream();
             copyCurrentFlow(flowOnDisk);
             flowBytes = flowOnDisk.toByteArray();
             authorizerFingerprint = getAuthorizerFingerprint();
+            missingComponents = new HashSet<>();
             logger.debug("Loaded Flow from bytes");
         } else {
             flowBytes = proposedFlow.getFlow();
             authorizerFingerprint = proposedFlow.getAuthorizerFingerprint();
+            missingComponents = proposedFlow.getMissingComponents();
             logger.debug("Loaded flow from proposed flow");
         }
 
-        actualProposedFlow = new StandardDataFlow(flowBytes, null, authorizerFingerprint);
-
-        if (firstControllerInitialization) {
-            // load the controller services
-            logger.debug("Loading controller services");
-        }
+        actualProposedFlow = new StandardDataFlow(flowBytes, null, authorizerFingerprint, missingComponents);
 
         // load the flow
         logger.debug("Loading proposed flow into FlowController");
@@ -682,6 +724,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             throw new FlowSynchronizationException("Failed to load flow because unable to connect to cluster and local flow is empty");
         }
 
+
+
         final List<Template> templates = loadTemplates();
         for (final Template template : templates) {
             final Template existing = rootGroup.getTemplate(template.getIdentifier());
@@ -692,16 +736,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
                 logger.info("Template '{}' was already present in Root Group so will not import from file", template.getDetails().getName());
             }
         }
-
-        // lazy initialization of controller tasks and flow
-        if (firstControllerInitialization) {
-            logger.debug("First controller initialization. Loading reporting tasks and initializing controller.");
-
-            // initialize the flow
-            controller.initializeFlow();
-
-            firstControllerInitialization = false;
-        }
     }
 
     /**
@@ -867,6 +901,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
             // get the dataflow from the response
             final DataFlow dataFlow = response.getDataFlow();
+            if (logger.isTraceEnabled()) {
+                logger.trace("ResponseFlow = " + new String(dataFlow.getFlow(), StandardCharsets.UTF_8));
+            }
 
             // load new controller state
             loadFromBytes(dataFlow, true);
@@ -884,6 +921,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
             controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
 
+            // Initialize the controller after the flow is loaded so we don't take any actions on repos until everything is good
+            initializeController();
+
             // start the processors as indicated by the dataflow
             controller.onFlowInitialized(autoResumeState);
 
@@ -892,6 +932,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             controller.startHeartbeating();
         } catch (final UninheritableFlowException ufe) {
             throw new UninheritableFlowException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow is different than cluster flow.", ufe);
+        } catch (final MissingBundleException mbe) {
+            throw new MissingBundleException(CONNECTION_EXCEPTION_MSG_PREFIX + "cluster flow contains bundles that do not exist on the current node", mbe);
         } catch (final FlowSerializationException fse) {
             throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local or cluster flow is malformed.", fse);
         } catch (final FlowSynchronizationException fse) {
@@ -905,6 +947,14 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
     }
 
+    private void initializeController() throws IOException {
+        if (firstControllerInitialization) {
+            logger.debug("First controller initialization, initializing controller...");
+            controller.initializeFlow();
+            firstControllerInitialization = false;
+        }
+    }
+
     @Override
     public void copyCurrentFlow(final OutputStream os) throws IOException {
         readLock.lock();
@@ -939,9 +989,15 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
         @Override
         public void run() {
-            final ClassLoader currentCl = Thread.currentThread().getContextClassLoader();
-            final ClassLoader cl = NarClassLoaders.getInstance().getFrameworkClassLoader();
-            Thread.currentThread().setContextClassLoader(cl);
+            ClassLoader currentCl = null;
+
+            final Bundle frameworkBundle = NarClassLoaders.getInstance().getFrameworkBundle();
+            if (frameworkBundle != null) {
+                currentCl = Thread.currentThread().getContextClassLoader();
+                final ClassLoader cl = frameworkBundle.getClassLoader();
+                Thread.currentThread().setContextClassLoader(cl);
+            }
+
             try {
                 //Hang onto the SaveHolder here rather than setting it to null because if the save fails we will try again
                 final SaveHolder holder = StandardFlowService.this.saveHolder.get();