You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/03/24 15:11:01 UTC
[14/17] nifi git commit: NIFI-3380 Bumping NAR plugin to
1.2.0-SNAPSHOT development to leverage changes from master,
adding buildnumber-maven-plugin to nifi-nar-bundles to properly set build info
in MANIFEST of NARs - Refactoring NarDetails to include al
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 191fc65..f312096 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -35,6 +35,8 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.DataAuthorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
@@ -63,6 +65,7 @@ import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater;
import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.label.StandardLabel;
@@ -105,6 +108,7 @@ import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSerializer;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
+import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
@@ -150,6 +154,7 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.StandardProcessorInitializationContext;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.provenance.IdentifierLookup;
@@ -184,11 +189,13 @@ import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
@@ -218,6 +225,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
@@ -279,6 +287,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final Set<RemoteSiteListener> externalSiteListeners = new HashSet<>();
private final AtomicReference<CounterRepository> counterRepositoryRef;
private final AtomicBoolean initialized = new AtomicBoolean(false);
+ private final AtomicBoolean flowSynchronized = new AtomicBoolean(false);
private final StandardControllerServiceProvider controllerServiceProvider;
private final Authorizer authorizer;
private final AuditService auditService;
@@ -1012,13 +1021,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
*
* @param type processor type
* @param id processor id
+ * @param coordinate the coordinate of the bundle for this processor
* @return new processor
* @throws NullPointerException if either arg is null
* @throws ProcessorInstantiationException if the processor cannot be
* instantiated for any reason
*/
- public ProcessorNode createProcessor(final String type, final String id) throws ProcessorInstantiationException {
- return createProcessor(type, id, true);
+ public ProcessorNode createProcessor(final String type, final String id, final BundleCoordinate coordinate) throws ProcessorInstantiationException {
+ return createProcessor(type, id, coordinate, true);
}
/**
@@ -1029,6 +1039,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
*
* @param type the fully qualified Processor class name
* @param id the unique ID of the Processor
+ * @param coordinate the bundle coordinate for this processor
* @param firstTimeAdded whether or not this is the first time this
* Processor is added to the graph. If {@code true}, will invoke methods
* annotated with the {@link OnAdded} annotation.
@@ -1037,39 +1048,63 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @throws ProcessorInstantiationException if the processor cannot be
* instantiated for any reason
*/
- public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException {
+ public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final boolean firstTimeAdded) throws ProcessorInstantiationException {
+ return createProcessor(type, id, coordinate, firstTimeAdded, true);
+ }
+
+ /**
+ * <p>
+ * Creates a new ProcessorNode with the given type and identifier and
+ * optionally initializes it.
+ * </p>
+ *
+ * @param type the fully qualified Processor class name
+ * @param id the unique ID of the Processor
+ * @param coordinate the bundle coordinate for this processor
+ * @param firstTimeAdded whether or not this is the first time this
+ * Processor is added to the graph. If {@code true}, will invoke methods
+ * annotated with the {@link OnAdded} annotation.
+ * @return new processor node
+ * @throws NullPointerException if either arg is null
+ * @throws ProcessorInstantiationException if the processor cannot be
+ * instantiated for any reason
+ */
+ public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final boolean firstTimeAdded, final boolean registerLogObserver)
+ throws ProcessorInstantiationException {
id = id.intern();
boolean creationSuccessful;
- Processor processor;
+ LoggableComponent<Processor> processor;
try {
- processor = instantiateProcessor(type, id);
+ processor = instantiateProcessor(type, id, coordinate);
creationSuccessful = true;
} catch (final ProcessorInstantiationException pie) {
LOG.error("Could not create Processor of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", pie);
final GhostProcessor ghostProc = new GhostProcessor();
ghostProc.setIdentifier(id);
ghostProc.setCanonicalClassName(type);
- processor = ghostProc;
+ processor = new LoggableComponent<>(ghostProc, coordinate, null);
creationSuccessful = false;
}
- final ComponentLog logger = new SimpleProcessLogger(id, processor);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry);
final ProcessorNode procNode;
if (creationSuccessful) {
- procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties, variableRegistry, logger);
+ procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties, variableRegistry);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
- procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties, variableRegistry, logger);
+ procNode = new StandardProcessorNode(
+ processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties, variableRegistry, true);
}
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
- logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
+ if (registerLogObserver) {
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
+ }
try {
- final Class<?> procClass = processor.getClass();
+ final Class<?> procClass = procNode.getProcessor().getClass();
if(procClass.isAnnotationPresent(DefaultSettings.class)) {
DefaultSettings ds = procClass.getAnnotation(DefaultSettings.class);
try {
@@ -1083,27 +1118,33 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} catch(Throwable ex) {
LOG.error(String.format("Error while setting penalty duration from DefaultSettings annotation:%s",ex.getMessage()),ex);
}
- try {
- procNode.setBulletinLevel(ds.bulletinLevel());
- } catch (Throwable ex) {
- LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s",ex.getMessage()),ex);
- }
+ // calling setBulletinLevel changes the level in the LogRepository so we only want to do this when
+ // the caller said to register the log observer, otherwise we could be changing the level when we didn't mean to
+ if (registerLogObserver) {
+ try {
+ procNode.setBulletinLevel(ds.bulletinLevel());
+ } catch (Throwable ex) {
+ LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s", ex.getMessage()), ex);
+ }
+ }
}
} catch (Throwable ex) {
LOG.error(String.format("Error while setting default settings from DefaultSettings annotation: %s",ex.getMessage()),ex);
}
if (firstTimeAdded) {
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, procNode.getProcessor());
} catch (final Exception e) {
- logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
+ if (registerLogObserver) {
+ logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
+ }
throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
}
if (firstTimeAdded) {
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), processor.getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
}
}
@@ -1112,30 +1153,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return procNode;
}
- private Processor instantiateProcessor(final String type, final String identifier) throws ProcessorInstantiationException {
- Processor processor;
+ private LoggableComponent<Processor> instantiateProcessor(final String type, final String identifier, final BundleCoordinate bundleCoordinate) throws ProcessorInstantiationException {
+ final Bundle processorBundle = ExtensionManager.getBundle(bundleCoordinate);
+ if (processorBundle == null) {
+ throw new ProcessorInstantiationException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
+ }
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
- final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type, identifier);
- final Class<?> rawClass;
- if (detectedClassLoaderForType == null) {
- // try to find from the current class loader
- rawClass = Class.forName(type);
- } else {
- // try to find from the registered classloader for that type
- rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type, identifier));
- }
-
+ final ClassLoader detectedClassLoaderForType = ExtensionManager.createInstanceClassLoader(type, identifier, processorBundle);
+ final Class<?> rawClass = Class.forName(type, true, processorBundle.getClassLoader());
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+
final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class);
- processor = processorClass.newInstance();
+ final Processor processor = processorClass.newInstance();
+
final ComponentLog componentLogger = new SimpleProcessLogger(identifier, processor);
final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, componentLogger, this, this, nifiProperties);
processor.initialize(ctx);
LogRepositoryFactory.getRepository(identifier).setLogger(componentLogger);
- return processor;
+
+ return new LoggableComponent<>(processor, bundleCoordinate, componentLogger);
} catch (final Throwable t) {
throw new ProcessorInstantiationException(type, t);
} finally {
@@ -1145,6 +1184,34 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ public void changeProcessorType(final ProcessorNode existingNode, final String newType, final BundleCoordinate bundleCoordinate) throws ProcessorInstantiationException {
+ if (existingNode == null) {
+ throw new IllegalStateException("Existing ProcessorNode cannot be null");
+ }
+
+ final String id = existingNode.getProcessor().getIdentifier();
+
+ // createProcessor will create a new instance class loader for the same id so
+ // save the instance class loader to use it for calling OnRemoved on the existing processor
+ final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
+
+ // create a new node with firstTimeAdded as true so lifecycle methods get fired
+ // attempt the creation to make sure it works before firing the OnRemoved methods below
+ final ProcessorNode newNode = createProcessor(newType, id, bundleCoordinate, true, false);
+
+ // call OnRemoved for the existing processor using the previous instance class loader
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+ final StandardProcessContext processContext = new StandardProcessContext(
+ existingNode, controllerServiceProvider, encryptor, getStateManagerProvider().getStateManager(id), variableRegistry);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
+ }
+
+ // set the new processor in the existing node
+ final LoggableComponent<Processor> newProcessor = new LoggableComponent<>(newNode.getProcessor(), newNode.getBundleCoordinate(), newNode.getLogger());
+ existingNode.setProcessor(newProcessor);
+ existingNode.setExtensionMissing(newNode.isExtensionMissing());
+ }
+
/**
* @return the ExtensionManager used for instantiating Processors,
* Prioritizers, etc.
@@ -1459,13 +1526,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @throws FlowSynchronizationException if updates to the controller failed.
* If this exception is thrown, then the controller should be considered
* unsafe to be used
+ * @throws MissingBundleException if the proposed flow cannot be loaded by the
+ * controller because it contains a bundle that does not exist in the controller
*/
public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow)
- throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
+ throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
writeLock.lock();
try {
LOG.debug("Synchronizing controller with proposed flow");
synchronizer.sync(this, dataFlow, encryptor);
+ flowSynchronized.set(true);
LOG.info("Successfully synchronized controller with proposed flow");
} finally {
writeLock.unlock();
@@ -1630,7 +1700,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Instantiate Controller Services
//
for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
- final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), true);
+ final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
+ final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate, true);
serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
serviceNode.setComments(controllerServiceDTO.getComments());
@@ -1717,7 +1788,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Instantiate the processors
//
for (final ProcessorDTO processorDTO : dto.getProcessors()) {
- final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId());
+ final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(processorDTO.getType(), processorDTO.getBundle());
+ final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate);
procNode.setPosition(toPosition(processorDTO.getPosition()));
procNode.setProcessGroup(group);
@@ -1953,45 +2025,83 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ private void verifyBundleInSnippet(final BundleDTO requiredBundle, final Set<BundleCoordinate> supportedBundles) {
+ final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
+ if (!supportedBundles.contains(requiredCoordinate)) {
+ throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
+ }
+ }
+
+ private void verifyProcessorsInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+ if (templateContents.getProcessors() != null) {
+ templateContents.getProcessors().forEach(processor -> {
+ if (processor.getBundle() == null) {
+ throw new IllegalArgumentException("Processor bundle must be specified.");
+ }
+
+ if (supportedTypes.containsKey(processor.getType())) {
+ verifyBundleInSnippet(processor.getBundle(), supportedTypes.get(processor.getType()));
+ } else {
+ throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
+ }
+ });
+ }
+
+ if (templateContents.getProcessGroups() != null) {
+ templateContents.getProcessGroups().forEach(processGroup -> {
+ verifyProcessorsInSnippet(processGroup.getContents(), supportedTypes);
+ });
+ }
+ }
+
+ private void verifyControllerServicesInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+ if (templateContents.getControllerServices() != null) {
+ templateContents.getControllerServices().forEach(controllerService -> {
+ if (supportedTypes.containsKey(controllerService.getType())) {
+ if (controllerService.getBundle() == null) {
+ throw new IllegalArgumentException("Controller Service bundle must be specified.");
+ }
+
+ verifyBundleInSnippet(controllerService.getBundle(), supportedTypes.get(controllerService.getType()));
+ } else {
+ throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
+ }
+ });
+ }
+
+ if (templateContents.getProcessGroups() != null) {
+ templateContents.getProcessGroups().forEach(processGroup -> {
+ verifyControllerServicesInSnippet(processGroup.getContents(), supportedTypes);
+ });
+ }
+ }
+
public void verifyComponentTypesInSnippet(final FlowSnippetDTO templateContents) {
- // validate that all Processor Types and Prioritizer Types are valid
- final Set<String> processorClasses = new HashSet<>();
+ final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
- processorClasses.add(c.getName());
+ final String name = c.getName();
+ processorClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
}
+ verifyProcessorsInSnippet(templateContents, processorClasses);
+
+ final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>();
+ for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) {
+ final String name = c.getName();
+ controllerServiceClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+ }
+ verifyControllerServicesInSnippet(templateContents, controllerServiceClasses);
+
final Set<String> prioritizerClasses = new HashSet<>();
for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
prioritizerClasses.add(c.getName());
}
- final Set<String> controllerServiceClasses = new HashSet<>();
- for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) {
- controllerServiceClasses.add(c.getName());
- }
- final Set<ProcessorDTO> allProcs = new HashSet<>();
final Set<ConnectionDTO> allConns = new HashSet<>();
- allProcs.addAll(templateContents.getProcessors());
allConns.addAll(templateContents.getConnections());
for (final ProcessGroupDTO childGroup : templateContents.getProcessGroups()) {
- allProcs.addAll(findAllProcessors(childGroup));
allConns.addAll(findAllConnections(childGroup));
}
- for (final ProcessorDTO proc : allProcs) {
- if (!processorClasses.contains(proc.getType())) {
- throw new IllegalStateException("Invalid Processor Type: " + proc.getType());
- }
- }
-
- final Set<ControllerServiceDTO> controllerServices = templateContents.getControllerServices();
- if (controllerServices != null) {
- for (final ControllerServiceDTO service : controllerServices) {
- if (!controllerServiceClasses.contains(service.getType())) {
- throw new IllegalStateException("Invalid Controller Service Type: " + service.getType());
- }
- }
- }
-
for (final ConnectionDTO conn : allConns) {
final List<String> prioritizers = conn.getPrioritizers();
if (prioritizers != null) {
@@ -2047,24 +2157,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
/**
- * Recursively finds all ProcessorDTO's
- *
- * @param group group
- * @return processor dto set
- */
- private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) {
- final Set<ProcessorDTO> procs = new HashSet<>();
- for (final ProcessorDTO dto : group.getContents().getProcessors()) {
- procs.add(dto);
- }
-
- for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) {
- procs.addAll(findAllProcessors(childGroup));
- }
- return procs;
- }
-
- /**
* Recursively finds all ConnectionDTO's
*
* @param group group
@@ -2110,16 +2202,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
- final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type);
- final Class<?> rawClass;
- if (detectedClassLoaderForType == null) {
- // try to find from the current class loader
- rawClass = Class.forName(type);
- } else {
- // try to find from the registered classloader for that type
- rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type));
+ final List<Bundle> prioritizerBundles = ExtensionManager.getBundles(type);
+ if (prioritizerBundles.size() == 0) {
+ throw new IllegalStateException(String.format("The specified class '%s' is not known to this nifi.", type));
+ }
+ if (prioritizerBundles.size() > 1) {
+ throw new IllegalStateException(String.format("Multiple bundles found for the specified class '%s', only one is allowed.", type));
}
+ final Bundle bundle = prioritizerBundles.get(0);
+ final ClassLoader detectedClassLoaderForType = bundle.getClassLoader();
+ final Class<?> rawClass = Class.forName(type, true, detectedClassLoaderForType);
+
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
final Class<? extends FlowFilePrioritizer> prioritizerClass = rawClass.asSubclass(FlowFilePrioritizer.class);
final Object processorObj = prioritizerClass.newInstance();
@@ -2772,6 +2866,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return initialized.get();
}
+ public boolean isFlowSynchronized() {
+ return flowSynchronized.get();
+ }
+
public void startConnectable(final Connectable connectable) {
final ProcessGroup group = requireNonNull(connectable).getProcessGroup();
@@ -2835,83 +2933,66 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
lookupGroup(groupId).stopProcessing();
}
- public ReportingTaskNode createReportingTask(final String type) throws ReportingTaskInstantiationException {
- return createReportingTask(type, true);
+ public ReportingTaskNode createReportingTask(final String type, final BundleCoordinate bundleCoordinate) throws ReportingTaskInstantiationException {
+ return createReportingTask(type, bundleCoordinate, true);
}
- public ReportingTaskNode createReportingTask(final String type, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
- return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded);
+ public ReportingTaskNode createReportingTask(final String type, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+ return createReportingTask(type, UUID.randomUUID().toString(), bundleCoordinate, firstTimeAdded);
}
@Override
- public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
- return createReportingTask(type, id, firstTimeAdded, true);
+ public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate,final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+ return createReportingTask(type, id, bundleCoordinate, firstTimeAdded, true);
}
- public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded, final boolean register) throws ReportingTaskInstantiationException {
- if (type == null || id == null) {
+ public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded, final boolean register)
+ throws ReportingTaskInstantiationException {
+ if (type == null || id == null || bundleCoordinate == null) {
throw new NullPointerException();
}
- ReportingTask task = null;
+ LoggableComponent<ReportingTask> task = null;
boolean creationSuccessful = true;
- final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
- final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type, id);
- final Class<?> rawClass;
- if (detectedClassLoader == null) {
- rawClass = Class.forName(type);
- } else {
- rawClass = Class.forName(type, false, detectedClassLoader);
- }
-
- Thread.currentThread().setContextClassLoader(detectedClassLoader);
- final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class);
- final Object reportingTaskObj = reportingTaskClass.newInstance();
- task = reportingTaskClass.cast(reportingTaskObj);
+ task = instantiateReportingTask(type, id, bundleCoordinate);
} catch (final Exception e) {
LOG.error("Could not create Reporting Task of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", e);
final GhostReportingTask ghostTask = new GhostReportingTask();
ghostTask.setIdentifier(id);
ghostTask.setCanonicalClassName(type);
- task = ghostTask;
+ task = new LoggableComponent<>(ghostTask, bundleCoordinate, null);
creationSuccessful = false;
- } finally {
- if (ctxClassLoader != null) {
- Thread.currentThread().setContextClassLoader(ctxClassLoader);
- }
}
- final ComponentLog logger = new SimpleProcessLogger(id, task);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry);
final ReportingTaskNode taskNode;
if (creationSuccessful) {
- taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry, logger);
+ taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
- taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry, logger);
+ taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry, true);
}
- taskNode.setName(task.getClass().getSimpleName());
+ taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName());
if (firstTimeAdded) {
- final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask());
final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
- SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this, nifiProperties);
+ SchedulingStrategy.TIMER_DRIVEN, "1 min", taskNode.getLogger(), this, nifiProperties);
try {
- task.initialize(config);
+ taskNode.getReportingTask().initialize(config);
} catch (final InitializationException ie) {
throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie);
}
try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getReportingTask().getIdentifier())) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask());
} catch (final Exception e) {
- throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
+ throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + taskNode.getReportingTask(), e);
}
}
@@ -2927,6 +3008,63 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return taskNode;
}
+ private LoggableComponent<ReportingTask> instantiateReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate)
+ throws ReportingTaskInstantiationException {
+
+ final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ final Bundle reportingTaskBundle = ExtensionManager.getBundle(bundleCoordinate);
+ if (reportingTaskBundle == null) {
+ throw new IllegalStateException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
+ }
+
+ final ClassLoader detectedClassLoader = ExtensionManager.createInstanceClassLoader(type, id, reportingTaskBundle);
+ final Class<?> rawClass = Class.forName(type, false, detectedClassLoader);
+ Thread.currentThread().setContextClassLoader(detectedClassLoader);
+
+ final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class);
+ final Object reportingTaskObj = reportingTaskClass.newInstance();
+
+ final ReportingTask reportingTask = reportingTaskClass.cast(reportingTaskObj);
+ final ComponentLog componentLog = new SimpleProcessLogger(id, reportingTask);
+
+ return new LoggableComponent<>(reportingTask, bundleCoordinate, componentLog);
+ } catch (final Exception e) {
+ throw new ReportingTaskInstantiationException(type, e);
+ } finally {
+ if (ctxClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(ctxClassLoader);
+ }
+ }
+ }
+
+ @Override
+ public void changeReportingTaskType(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate) throws ReportingTaskInstantiationException {
+ if (existingNode == null) {
+ throw new IllegalStateException("Existing ReportingTaskNode cannot be null");
+ }
+
+ final String id = existingNode.getReportingTask().getIdentifier();
+
+ // createReportingTask will create a new instance class loader for the same id so
+ // save the instance class loader to use it for calling OnRemoved on the existing processor
+ final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
+
+ // set firstTimeAdded to true so lifecycle annotations get fired, but don't register this node
+ // attempt the creation to make sure it works before firing the OnRemoved methods below
+ final ReportingTaskNode newNode = createReportingTask(newType, id, bundleCoordinate, true, false);
+
+ // call OnRemoved for the existing reporting task using the previous instance class loader
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getReportingTask(), existingNode.getConfigurationContext());
+ }
+
+ // set the new reporting task into the existing node
+ final LoggableComponent<ReportingTask> newReportingTask = new LoggableComponent<>(newNode.getReportingTask(), newNode.getBundleCoordinate(), newNode.getLogger());
+ existingNode.setReportingTask(newReportingTask);
+ existingNode.setExtensionMissing(newNode.isExtensionMissing());
+ }
+
@Override
public ReportingTaskNode getReportingTaskNode(final String taskId) {
return reportingTasks.get(taskId);
@@ -2988,8 +3126,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
@Override
- public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
- final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+ public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) {
+ final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, bundleCoordinate, firstTimeAdded);
// Register log observer to provide bulletins when reporting task logs anything at WARN level or above
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
@@ -3007,6 +3145,42 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return serviceNode;
}
+ public void changeControllerServiceType(final ControllerServiceNode existingNode, final String newType, final BundleCoordinate bundleCoordinate)
+ throws ControllerServiceInstantiationException {
+ if (existingNode == null) {
+ throw new IllegalStateException("Existing ControllerServiceNode cannot be null");
+ }
+
+ final String id = existingNode.getIdentifier();
+
+ // createControllerService will create a new instance class loader for the same id so
+ // save the instance class loader to use it for calling OnRemoved on the existing service
+ final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
+
+ // create a new node with firstTimeAdded as true so lifecycle methods get called
+ // attempt the creation to make sure it works before firing the OnRemoved methods below
+ final ControllerServiceNode newNode = controllerServiceProvider.createControllerService(newType, id, bundleCoordinate, true);
+
+ // call OnRemoved for the existing service using the previous instance class loader
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
+ final ConfigurationContext configurationContext = new StandardConfigurationContext(existingNode, controllerServiceProvider, null, variableRegistry);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getControllerServiceImplementation(), configurationContext);
+ }
+
+ // take the invocation handler that was created for new proxy and is set to look at the new node,
+ // and set it to look at the existing node
+ final ControllerServiceInvocationHandler invocationHandler = newNode.getInvocationHandler();
+ invocationHandler.setServiceNode(existingNode);
+
+ // create LoggableComponents for the proxy and implementation
+ final LoggableComponent<ControllerService> loggableProxy = new LoggableComponent<>(newNode.getProxiedControllerService(), bundleCoordinate, newNode.getLogger());
+ final LoggableComponent<ControllerService> loggableImplementation = new LoggableComponent<>(newNode.getControllerServiceImplementation(), bundleCoordinate, newNode.getLogger());
+
+ // set the new impl, proxy, and invocation handler into the existing node
+ existingNode.setControllerServiceAndProxy(loggableImplementation, loggableProxy, invocationHandler);
+ existingNode.setExtensionMissing(newNode.isExtensionMissing());
+ }
+
@Override
public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanEnable();
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java
new file mode 100644
index 0000000..175f252
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/MissingBundleException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.cluster.ConnectionException;
+
+/**
+ * Represents the exceptional case when a node fails to join the cluster because a bundle being used by the cluster does not exist on the node.
+ */
+public class MissingBundleException extends ConnectionException {
+
+ private static final long serialVersionUID = 198234798234794L;
+
+ public MissingBundleException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+ public MissingBundleException(Throwable cause) {
+ super(cause);
+ }
+
+ public MissingBundleException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MissingBundleException(String message) {
+ super(message);
+ }
+
+ public MissingBundleException() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
new file mode 100644
index 0000000..687d5c4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ProcessorDetails.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.Processor;
+
+/**
+ * Holder for StandardProcessorNode to atomically swap out the component.
+ */
+public class ProcessorDetails {
+
+ private final Processor processor;
+ private final Class<?> procClass;
+ private final boolean triggerWhenEmpty;
+ private final boolean sideEffectFree;
+ private final boolean triggeredSerially;
+ private final boolean triggerWhenAnyDestinationAvailable;
+ private final boolean eventDrivenSupported;
+ private final boolean batchSupported;
+ private final InputRequirement.Requirement inputRequirement;
+ private final ComponentLog componentLog;
+ private final BundleCoordinate bundleCoordinate;
+
+ public ProcessorDetails(final LoggableComponent<Processor> processor) {
+ this.processor = processor.getComponent();
+ this.componentLog = processor.getLogger();
+ this.bundleCoordinate = processor.getBundleCoordinate();
+
+ this.procClass = this.processor.getClass();
+ this.triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class);
+ this.sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class);
+ this.batchSupported = procClass.isAnnotationPresent(SupportsBatching.class);
+ this.triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class);
+ this.triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class);
+ this.eventDrivenSupported = procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty;
+
+ final boolean inputRequirementPresent = procClass.isAnnotationPresent(InputRequirement.class);
+ if (inputRequirementPresent) {
+ this.inputRequirement = procClass.getAnnotation(InputRequirement.class).value();
+ } else {
+ this.inputRequirement = InputRequirement.Requirement.INPUT_ALLOWED;
+ }
+ }
+
+ public Processor getProcessor() {
+ return processor;
+ }
+
+ public Class<?> getProcClass() {
+ return procClass;
+ }
+
+ public boolean isTriggerWhenEmpty() {
+ return triggerWhenEmpty;
+ }
+
+ public boolean isSideEffectFree() {
+ return sideEffectFree;
+ }
+
+ public boolean isTriggeredSerially() {
+ return triggeredSerially;
+ }
+
+ public boolean isTriggerWhenAnyDestinationAvailable() {
+ return triggerWhenAnyDestinationAvailable;
+ }
+
+ public boolean isEventDrivenSupported() {
+ return eventDrivenSupported;
+ }
+
+ public boolean isBatchSupported() {
+ return batchSupported;
+ }
+
+ public InputRequirement.Requirement getInputRequirement() {
+ return inputRequirement;
+ }
+
+ public ComponentLog getComponentLog() {
+ return componentLog;
+ }
+
+ public BundleCoordinate getBundleCoordinate() {
+ return bundleCoordinate;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 2d35a63..0ce6742 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -16,39 +16,10 @@
*/
package org.apache.nifi.controller;
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.cluster.ConnectionException;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
@@ -94,6 +65,38 @@ import org.apache.nifi.web.revision.RevisionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
public class StandardFlowService implements FlowService, ProtocolHandler {
private static final String EVENT_CATEGORY = "Controller";
@@ -135,11 +138,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
*/
private NodeIdentifier nodeId;
- private final NiFiProperties nifiProperties;
-
// guardedBy rwLock
private boolean firstControllerInitialization = true;
+ private final NiFiProperties nifiProperties;
+
private static final String CONNECTION_EXCEPTION_MSG_PREFIX = "Failed to connect node to cluster because ";
private static final Logger logger = LoggerFactory.getLogger(StandardFlowService.class);
@@ -434,9 +437,28 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
@Override
- public void load(final DataFlow dataFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
+ public void load(final DataFlow dataFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
if (configuredForClustering) {
- final DataFlow proposedFlow = (dataFlow == null) ? createDataFlow() : dataFlow;
+ // Create the initial flow from disk if it exists, or from serializing the empty root group in flow controller
+ final DataFlow initialFlow = (dataFlow == null) ? createDataFlow() : dataFlow;
+ if (logger.isTraceEnabled()) {
+ logger.trace("InitialFlow = " + new String(initialFlow.getFlow(), StandardCharsets.UTF_8));
+ }
+
+ // Sync the initial flow into the flow controller so that if the flow came from disk we loaded the
+ // whole flow into the flow controller and applied any bundle upgrades
+ writeLock.lock();
+ try {
+ loadFromBytes(initialFlow, true);
+ } finally {
+ writeLock.unlock();
+ }
+
+ // Get the proposed flow by serializing the flow controller which now has the synced version from above
+ final DataFlow proposedFlow = createDataFlowFromController();
+ if (logger.isTraceEnabled()) {
+ logger.trace("ProposedFlow = " + new String(proposedFlow.getFlow(), StandardCharsets.UTF_8));
+ }
/*
* Attempt to connect to the cluster. If the manager is able to
@@ -457,9 +479,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
if (response == null || response.shouldTryLater()) {
logger.info("Flow controller will load local dataflow and suspend connection handshake until a cluster connection response is received.");
- // load local proposed flow
- loadFromBytes(proposedFlow, false);
-
// set node ID on controller before we start heartbeating because heartbeat needs node ID
controller.setNodeId(nodeId);
clusterCoordinator.setLocalNodeIdentifier(nodeId);
@@ -479,6 +498,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
*/
controller.startHeartbeating();
+ // Initialize the controller after the flow is loaded so we don't take any actions on repos until everything is good
+ initializeController();
+
// notify controller that flow is initialized
try {
controller.onFlowInitialized(autoResumeState);
@@ -491,21 +513,26 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
} else {
try {
loadFromConnectionResponse(response);
- dao.save(controller, true);
} catch (final Exception e) {
logger.error("Failed to load flow from cluster due to: " + e, e);
handleConnectionFailure(e);
throw new IOException(e);
}
}
+
+ // save the flow in the controller so we write out the latest flow with any updated bundles to disk
+ dao.save(controller, true);
+
} finally {
writeLock.unlock();
}
} else {
writeLock.lock();
try {
- // operating in standalone mode, so load proposed flow
+ // operating in standalone mode, so load proposed flow and initialize the controller
loadFromBytes(dataFlow, true);
+ initializeController();
+ dao.save(controller, true);
} finally {
writeLock.unlock();
}
@@ -516,6 +543,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
DisconnectionCode disconnectionCode;
if (ex instanceof UninheritableFlowException) {
disconnectionCode = DisconnectionCode.MISMATCHED_FLOWS;
+ } else if (ex instanceof MissingBundleException) {
+ disconnectionCode = DisconnectionCode.MISSING_BUNDLE;
} else if (ex instanceof FlowSynchronizationException) {
disconnectionCode = DisconnectionCode.MISMATCHED_FLOWS;
} else {
@@ -533,7 +562,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// create the response
final FlowResponseMessage response = new FlowResponseMessage();
- response.setDataFlow(createDataFlow());
+ response.setDataFlow(createDataFlowFromController());
return response;
} catch (final Exception ex) {
throw new ProtocolException("Failed serializing flow controller state for flow request due to: " + ex, ex);
@@ -549,15 +578,15 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
@Override
public StandardDataFlow createDataFlow() throws IOException {
- final byte[] snippetBytes = controller.getSnippetManager().export();
- final byte[] authorizerFingerprint = getAuthorizerFingerprint();
-
// Load the flow from disk if the file exists.
if (dao.isFlowPresent()) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
dao.load(baos);
final byte[] bytes = baos.toByteArray();
- final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint);
+
+ final byte[] snippetBytes = controller.getSnippetManager().export();
+ final byte[] authorizerFingerprint = getAuthorizerFingerprint();
+ final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint, new HashSet<>());
return fromDisk;
}
@@ -566,14 +595,28 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// will automatically create a Root Process Group, and we need to ensure that
// we replicate that Process Group to all nodes in the cluster, so that they all
// end up with the same ID for the root Process Group.
+ return createDataFlowFromController();
+ }
+
+ @Override
+ public StandardDataFlow createDataFlowFromController() throws IOException {
+ final byte[] snippetBytes = controller.getSnippetManager().export();
+ final byte[] authorizerFingerprint = getAuthorizerFingerprint();
+
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
dao.save(controller, baos);
final byte[] flowBytes = baos.toByteArray();
baos.reset();
- return new StandardDataFlow(flowBytes, snippetBytes, authorizerFingerprint);
+ final Set<String> missingComponents = new HashSet<>();
+ controller.getRootGroup().findAllProcessors().stream().filter(p -> p.isExtensionMissing()).forEach(p -> missingComponents.add(p.getIdentifier()));
+ controller.getAllControllerServices().stream().filter(cs -> cs.isExtensionMissing()).forEach(cs -> missingComponents.add(cs.getIdentifier()));
+ controller.getAllReportingTasks().stream().filter(r -> r.isExtensionMissing()).forEach(r -> missingComponents.add(r.getIdentifier()));
+
+ return new StandardDataFlow(flowBytes, snippetBytes, authorizerFingerprint, missingComponents);
}
+
private NodeIdentifier getNodeId() {
readLock.lock();
try {
@@ -593,7 +636,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
if (connectionResponse.getDataFlow() == null) {
logger.info("Received a Reconnection Request that contained no DataFlow. Will attempt to connect to cluster using local flow.");
- connectionResponse = connect(false, false, createDataFlow());
+ connectionResponse = connect(false, false, createDataFlowFromController());
}
loadFromConnectionResponse(connectionResponse);
@@ -623,7 +666,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
writeLock.lock();
try {
- logger.info("Disconnecting node.");
+ logger.info("Disconnecting node due to " + explanation);
// mark node as not connected
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, DisconnectionCode.UNKNOWN, explanation));
@@ -638,7 +681,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
controller.setClustered(false, null);
clusterCoordinator.setConnected(false);
- logger.info("Node disconnected.");
+ logger.info("Node disconnected due to " + explanation);
} finally {
writeLock.unlock();
@@ -647,31 +690,30 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// write lock must already be acquired
private void loadFromBytes(final DataFlow proposedFlow, final boolean allowEmptyFlow)
- throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
+ throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
logger.trace("Loading flow from bytes");
// resolve the given flow (null means load flow from disk)
final DataFlow actualProposedFlow;
final byte[] flowBytes;
final byte[] authorizerFingerprint;
+ final Set<String> missingComponents;
+
if (proposedFlow == null) {
final ByteArrayOutputStream flowOnDisk = new ByteArrayOutputStream();
copyCurrentFlow(flowOnDisk);
flowBytes = flowOnDisk.toByteArray();
authorizerFingerprint = getAuthorizerFingerprint();
+ missingComponents = new HashSet<>();
logger.debug("Loaded Flow from bytes");
} else {
flowBytes = proposedFlow.getFlow();
authorizerFingerprint = proposedFlow.getAuthorizerFingerprint();
+ missingComponents = proposedFlow.getMissingComponents();
logger.debug("Loaded flow from proposed flow");
}
- actualProposedFlow = new StandardDataFlow(flowBytes, null, authorizerFingerprint);
-
- if (firstControllerInitialization) {
- // load the controller services
- logger.debug("Loading controller services");
- }
+ actualProposedFlow = new StandardDataFlow(flowBytes, null, authorizerFingerprint, missingComponents);
// load the flow
logger.debug("Loading proposed flow into FlowController");
@@ -682,6 +724,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
throw new FlowSynchronizationException("Failed to load flow because unable to connect to cluster and local flow is empty");
}
+
+
final List<Template> templates = loadTemplates();
for (final Template template : templates) {
final Template existing = rootGroup.getTemplate(template.getIdentifier());
@@ -692,16 +736,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
logger.info("Template '{}' was already present in Root Group so will not import from file", template.getDetails().getName());
}
}
-
- // lazy initialization of controller tasks and flow
- if (firstControllerInitialization) {
- logger.debug("First controller initialization. Loading reporting tasks and initializing controller.");
-
- // initialize the flow
- controller.initializeFlow();
-
- firstControllerInitialization = false;
- }
}
/**
@@ -867,6 +901,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// get the dataflow from the response
final DataFlow dataFlow = response.getDataFlow();
+ if (logger.isTraceEnabled()) {
+ logger.trace("ResponseFlow = " + new String(dataFlow.getFlow(), StandardCharsets.UTF_8));
+ }
// load new controller state
loadFromBytes(dataFlow, true);
@@ -884,6 +921,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
+ // Initialize the controller after the flow is loaded so we don't take any actions on repos until everything is good
+ initializeController();
+
// start the processors as indicated by the dataflow
controller.onFlowInitialized(autoResumeState);
@@ -892,6 +932,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
controller.startHeartbeating();
} catch (final UninheritableFlowException ufe) {
throw new UninheritableFlowException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow is different than cluster flow.", ufe);
+ } catch (final MissingBundleException mbe) {
+ throw new MissingBundleException(CONNECTION_EXCEPTION_MSG_PREFIX + "cluster flow contains bundles that do not exist on the current node", mbe);
} catch (final FlowSerializationException fse) {
throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local or cluster flow is malformed.", fse);
} catch (final FlowSynchronizationException fse) {
@@ -905,6 +947,14 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
+ private void initializeController() throws IOException {
+ if (firstControllerInitialization) {
+ logger.debug("First controller initialization, initializing controller...");
+ controller.initializeFlow();
+ firstControllerInitialization = false;
+ }
+ }
+
@Override
public void copyCurrentFlow(final OutputStream os) throws IOException {
readLock.lock();
@@ -939,9 +989,15 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
@Override
public void run() {
- final ClassLoader currentCl = Thread.currentThread().getContextClassLoader();
- final ClassLoader cl = NarClassLoaders.getInstance().getFrameworkClassLoader();
- Thread.currentThread().setContextClassLoader(cl);
+ ClassLoader currentCl = null;
+
+ final Bundle frameworkBundle = NarClassLoaders.getInstance().getFrameworkBundle();
+ if (frameworkBundle != null) {
+ currentCl = Thread.currentThread().getContextClassLoader();
+ final ClassLoader cl = frameworkBundle.getClassLoader();
+ Thread.currentThread().setContextClassLoader(cl);
+ }
+
try {
//Hang onto the SaveHolder here rather than setting it to null because if the save fails we will try again
final SaveHolder holder = StandardFlowService.this.saveHolder.get();