You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/11/06 16:24:09 UTC
[8/9] nifi git commit: NIFI-5769: Refactored FlowController to use
Composition over Inheritance - Ensure that when root group is set,
that we register its ID in FlowManager
http://git-wip-us.apache.org/repos/asf/nifi/blob/931bb0bc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 680962e..c538044 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,28 +16,19 @@
*/
package org.apache.nifi.controller;
-import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.action.Action;
import org.apache.nifi.admin.service.AuditService;
-import org.apache.nifi.annotation.configuration.DefaultSettings;
-import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
-import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.authorization.Authorizer;
-import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
-import org.apache.nifi.authorization.resource.DataAuthorizable;
-import org.apache.nifi.authorization.resource.ProvenanceDataAuthorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.util.IdentityMapping;
import org.apache.nifi.authorization.util.IdentityMappingUtil;
-import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
@@ -52,8 +43,6 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.StandardValidationTrigger;
import org.apache.nifi.components.validation.TriggerValidationTask;
@@ -63,19 +52,15 @@ import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
-import org.apache.nifi.connectable.LocalPort;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
-import org.apache.nifi.connectable.Size;
import org.apache.nifi.connectable.StandardConnection;
import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater;
import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.exception.CommunicationsException;
-import org.apache.nifi.controller.exception.ComponentLifeCycleException;
-import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
-import org.apache.nifi.controller.exception.ProcessorInstantiationException;
-import org.apache.nifi.controller.label.Label;
-import org.apache.nifi.controller.label.StandardLabel;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.flow.StandardFlowManager;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
import org.apache.nifi.controller.queue.ConnectionEventListener;
@@ -97,19 +82,15 @@ import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
-import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
-import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
-import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.QueueProvider;
-import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.repository.StandardCounterRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.StandardQueueProvider;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary;
@@ -120,7 +101,6 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
-import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent;
import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
@@ -131,20 +111,12 @@ import org.apache.nifi.controller.serialization.FlowSerializer;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
-import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
-import org.apache.nifi.controller.status.ConnectionStatus;
-import org.apache.nifi.controller.status.PortStatus;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.RunStatus;
-import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
@@ -162,92 +134,47 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.framework.security.util.SslContextFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
-import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.groups.StandardProcessGroup;
-import org.apache.nifi.history.History;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.logging.ControllerServiceLogObserver;
-import org.apache.nifi.logging.LogLevel;
-import org.apache.nifi.logging.LogRepository;
-import org.apache.nifi.logging.LogRepositoryFactory;
-import org.apache.nifi.logging.ProcessorLogObserver;
-import org.apache.nifi.logging.ReportingTaskLogObserver;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
-import org.apache.nifi.processor.GhostProcessor;
import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.SimpleProcessLogger;
-import org.apache.nifi.processor.StandardProcessContext;
-import org.apache.nifi.processor.StandardProcessorInitializationContext;
-import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.provenance.ComponentIdentifierLookup;
import org.apache.nifi.provenance.IdentifierLookup;
import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.StandardProvenanceAuthorizableFactory;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.StandardVersionControlInformation;
-import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
-import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RemoteResourceManager;
import org.apache.nifi.remote.RemoteSiteListener;
-import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.SocketRemoteSiteListener;
-import org.apache.nifi.remote.StandardRemoteProcessGroup;
-import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
-import org.apache.nifi.remote.StandardRootGroupPort;
-import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.cluster.NodeInformant;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.reporting.EventAccess;
-import org.apache.nifi.reporting.GhostReportingTask;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.reporting.StandardEventAccess;
+import org.apache.nifi.reporting.UserAwareEventAccess;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
-import org.apache.nifi.util.SnippetUtils;
import org.apache.nifi.util.concurrency.TimedLock;
-import org.apache.nifi.web.ResourceNotFoundException;
-import org.apache.nifi.web.api.dto.BatchSettingsDTO;
-import org.apache.nifi.web.api.dto.BundleDTO;
-import org.apache.nifi.web.api.dto.ConnectableDTO;
-import org.apache.nifi.web.api.dto.ConnectionDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.FunnelDTO;
-import org.apache.nifi.web.api.dto.LabelDTO;
-import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.RelationshipDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
@@ -255,28 +182,25 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -288,8 +212,7 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
-public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider,
- QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup, ReloadComponent {
+public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider {
// default repository implementations
public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
@@ -303,8 +226,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 5-minute captures
- public static final String ROOT_GROUP_ID_ALIAS = "root";
- public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
// default properties for scaling the positions of components from pre-1.0 flow encoding versions.
public static final double DEFAULT_POSITION_SCALE_FACTOR_X = 1.5;
@@ -338,9 +259,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final ComponentStatusRepository componentStatusRepository;
private final StateManagerProvider stateManagerProvider;
private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
- private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
private final VariableRegistry variableRegistry;
- private final ConcurrentMap<String, ControllerServiceNode> rootControllerServices = new ConcurrentHashMap<>();
private final ConnectionLoadBalanceServer loadBalanceServer;
private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry;
@@ -366,7 +285,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final Integer remoteInputHttpPort;
private final Boolean isSiteToSiteSecure;
- private final AtomicReference<ProcessGroup> rootGroupRef = new AtomicReference<>();
private final List<Connectable> startConnectablesAfterInitialization;
private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
private final LeaderElectionManager leaderElectionManager;
@@ -374,6 +292,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final FlowRegistryClient flowRegistryClient;
private final FlowEngine validationThreadPool;
private final ValidationTrigger validationTrigger;
+ private final ReloadComponent reloadComponent;
+ private final ProvenanceAuthorizableFactory provenanceAuthorizableFactory;
+ private final UserAwareEventAccess eventAccess;
+ private final StandardFlowManager flowManager;
/**
* true if controller is configured to operate in a clustered environment
@@ -534,8 +456,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.variableRegistry = variableRegistry == null ? VariableRegistry.EMPTY_REGISTRY : variableRegistry;
try {
+ this.provenanceAuthorizableFactory = new StandardProvenanceAuthorizableFactory(this);
this.provenanceRepository = createProvenanceRepository(nifiProperties);
- this.provenanceRepository.initialize(createEventReporter(bulletinRepository), authorizer, this, this);
+
+ final IdentifierLookup identifierLookup = new ComponentIdentifierLookup(this);
+
+ this.provenanceRepository.initialize(createEventReporter(), authorizer, provenanceAuthorizableFactory, identifierLookup);
} catch (final Exception e) {
throw new RuntimeException("Unable to create Provenance Repository", e);
}
@@ -557,8 +483,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
+ this.flowManager = new StandardFlowManager(nifiProperties, sslContext, this, flowFileEventRepository);
+
+ controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository);
+
eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
- eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
+ eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
@@ -598,19 +528,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getNodeHeartbeatInterval(), TimeUnit.SECONDS);
this.snippetManager = new SnippetManager();
+ this.reloadComponent = new StandardReloadComponent(this);
- final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), this, processScheduler,
+ final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), controllerServiceProvider, processScheduler,
nifiProperties, encryptor, this, new MutableVariableRegistry(this.variableRegistry));
- rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
+ rootGroup.setName(FlowManager.DEFAULT_ROOT_GROUP_NAME);
setRootGroup(rootGroup);
instanceId = ComponentIdGenerator.generateId().toString();
this.validationThreadPool = new FlowEngine(5, "Validate Components", true);
this.validationTrigger = new StandardValidationTrigger(validationThreadPool, this::isInitialized);
- controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository, stateManagerProvider,
- this.variableRegistry, this.nifiProperties, validationTrigger);
-
if (remoteInputSocketPort == null) {
LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set");
} else if (isSiteToSiteSecure && sslContext == null) {
@@ -655,12 +583,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
zooKeeperStateServer = null;
}
+ eventAccess = new StandardEventAccess(this, flowFileEventRepository);
componentStatusRepository = createComponentStatusRepository();
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
- componentStatusRepository.capture(getControllerStatus(), getGarbageCollectionStatus());
+ componentStatusRepository.capture(eventAccess.getControllerStatus(), getGarbageCollectionStatus());
} catch (final Exception e) {
LOG.error("Failed to capture component stats for Stats History", e);
}
@@ -704,7 +633,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final InetSocketAddress loadBalanceAddress = nifiProperties.getClusterLoadBalanceAddress();
// Setup Load Balancing Server
- final EventReporter eventReporter = createEventReporter(bulletinRepository);
+ final EventReporter eventReporter = createEventReporter();
final List<IdentityMapping> identityMappings = IdentityMappingUtil.getIdentityMappings(nifiProperties);
final LoadBalanceAuthorizer authorizeConnection = new ClusterLoadBalanceAuthorizer(clusterCoordinator, eventReporter);
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepository, provenanceRepository, this, authorizeConnection);
@@ -766,20 +695,44 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
- private static FlowFileSwapManager createSwapManager(final NiFiProperties properties, final ExtensionManager extensionManager) {
- final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, DEFAULT_SWAP_MANAGER_IMPLEMENTATION);
+ public FlowFileSwapManager createSwapManager() {
+ final String implementationClassName = nifiProperties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, DEFAULT_SWAP_MANAGER_IMPLEMENTATION);
if (implementationClassName == null) {
return null;
}
try {
- return NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, FlowFileSwapManager.class, properties);
+ final FlowFileSwapManager swapManager = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, FlowFileSwapManager.class, nifiProperties);
+
+ final EventReporter eventReporter = createEventReporter();
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() {
+ @Override
+ public ResourceClaimManager getResourceClaimManager() {
+ return resourceClaimManager;
+ }
+
+ @Override
+ public FlowFileRepository getFlowFileRepository() {
+ return flowFileRepository;
+ }
+
+ @Override
+ public EventReporter getEventReporter() {
+ return eventReporter;
+ }
+ };
+
+ swapManager.initialize(initializationContext);
+ }
+
+ return swapManager;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
- private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) {
+ public EventReporter createEventReporter() {
return new EventReporter() {
private static final long serialVersionUID = 1L;
@@ -795,7 +748,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
writeLock.lock();
try {
// get all connections/queues and recover from swap files.
- final List<Connection> connections = getGroup(getRootGroupId()).findAllConnections();
+ final List<Connection> connections = flowManager.getRootGroup().findAllConnections();
long maxIdFromSwapFiles = -1L;
if (flowFileRepository.isVolatile()) {
@@ -820,7 +773,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
- flowFileRepository.loadFlowFiles(this, maxIdFromSwapFiles + 1);
+ flowFileRepository.loadFlowFiles(new StandardQueueProvider(this), maxIdFromSwapFiles + 1);
// Begin expiring FlowFiles that are old
final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository,
@@ -858,7 +811,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
- final ProcessGroup rootGroup = getRootGroup();
+ final ProcessGroup rootGroup = flowManager.getRootGroup();
final List<ProcessGroup> allGroups = rootGroup.findAllProcessGroups();
allGroups.add(rootGroup);
@@ -879,14 +832,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
private void notifyComponentsConfigurationRestored() {
- for (final ProcessorNode procNode : getGroup(getRootGroupId()).findAllProcessors()) {
+ for (final ProcessorNode procNode : flowManager.getRootGroup().findAllProcessors()) {
final Processor processor = procNode.getProcessor();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
}
}
- for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
+ for (final ControllerServiceNode serviceNode : flowManager.getAllControllerServices()) {
final ControllerService service = serviceNode.getControllerServiceImplementation();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) {
@@ -944,13 +897,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
};
- new TriggerValidationTask(this, triggerIfValidating).run();
+ new TriggerValidationTask(flowManager, triggerIfValidating).run();
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
LOG.info("Performed initial validation of all components in {} milliseconds", millis);
// Trigger component validation to occur every 5 seconds.
- validationThreadPool.scheduleWithFixedDelay(new TriggerValidationTask(this, validationTrigger), 5, 5, TimeUnit.SECONDS);
+ validationThreadPool.scheduleWithFixedDelay(new TriggerValidationTask(flowManager, validationTrigger), 5, 5, TimeUnit.SECONDS);
if (startDelayedComponents) {
LOG.info("Starting {} processors/ports/funnels", startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size());
@@ -1005,7 +958,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
startRemoteGroupPortsAfterInitialization.clear();
}
- for (final Connection connection : getRootGroup().findAllConnections()) {
+ for (final Connection connection : flowManager.getRootGroup().findAllConnections()) {
connection.getFlowFileQueue().startLoadBalancing();
}
} finally {
@@ -1063,377 +1016,29 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
- /**
- * Creates a connection between two Connectable objects.
- *
- * @param id required ID of the connection
- * @param name the name of the connection, or <code>null</code> to leave the
- * connection unnamed
- * @param source required source
- * @param destination required destination
- * @param relationshipNames required collection of relationship names
- * @return
- *
- * @throws NullPointerException if the ID, source, destination, or set of
- * relationships is null.
- * @throws IllegalArgumentException if <code>relationships</code> is an
- * empty collection
- */
- public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) {
- final StandardConnection.Builder builder = new StandardConnection.Builder(processScheduler);
-
- final List<Relationship> relationships = new ArrayList<>();
- for (final String relationshipName : requireNonNull(relationshipNames)) {
- relationships.add(new Relationship.Builder().name(relationshipName).build());
- }
-
- // Create and initialize a FlowFileSwapManager for this connection
- final FlowFileSwapManager swapManager = createSwapManager(nifiProperties, extensionManager);
- final EventReporter eventReporter = createEventReporter(getBulletinRepository());
-
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() {
- @Override
- public ResourceClaimManager getResourceClaimManager() {
- return resourceClaimManager;
- }
-
- @Override
- public FlowFileRepository getFlowFileRepository() {
- return flowFileRepository;
- }
-
- @Override
- public EventReporter getEventReporter() {
- return eventReporter;
- }
- };
-
- swapManager.initialize(initializationContext);
- }
-
- final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
- @Override
- public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) {
- final FlowFileQueue flowFileQueue;
-
- if (clusterCoordinator == null) {
- flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
- eventReporter, nifiProperties.getQueueSwapThreshold(), nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold());
- } else {
- flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, eventListener, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
- clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
-
- flowFileQueue.setBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold());
- flowFileQueue.setBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold());
- }
-
- return flowFileQueue;
- }
- };
-
- final Connection connection = builder.id(requireNonNull(id).intern())
- .name(name == null ? null : name.intern())
- .relationships(relationships)
- .source(requireNonNull(source))
- .destination(destination)
- .flowFileQueueFactory(flowFileQueueFactory)
- .build();
-
- return connection;
- }
-
- /**
- * Creates a new Label
- *
- * @param id identifier
- * @param text label text
- * @return new label
- * @throws NullPointerException if either argument is null
- */
- public Label createLabel(final String id, final String text) {
- return new StandardLabel(requireNonNull(id).intern(), text);
- }
-
- /**
- * Creates a funnel
- *
- * @param id funnel id
- * @return new funnel
- */
- public Funnel createFunnel(final String id) {
- return new StandardFunnel(id.intern(), null, processScheduler);
- }
-
- /**
- * Creates a Port to use as an Input Port for a Process Group
- *
- * @param id port identifier
- * @param name port name
- * @return new port
- * @throws NullPointerException if the ID or name is not unique
- * @throws IllegalStateException if an Input Port already exists with the
- * same name or id.
- */
- public Port createLocalInputPort(String id, String name) {
- id = requireNonNull(id).intern();
- name = requireNonNull(name).intern();
- verifyPortIdDoesNotExist(id);
- return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, processScheduler);
- }
-
- /**
- * Creates a Port to use as an Output Port for a Process Group
- *
- * @param id port id
- * @param name port name
- * @return new port
- * @throws NullPointerException if the ID or name is not unique
- * @throws IllegalStateException if an Input Port already exists with the
- * same name or id.
- */
- public Port createLocalOutputPort(String id, String name) {
- id = requireNonNull(id).intern();
- name = requireNonNull(name).intern();
- verifyPortIdDoesNotExist(id);
- return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, processScheduler);
- }
-
- /**
- * Creates a ProcessGroup with the given ID
- *
- * @param id group id
- * @return new group
- * @throws NullPointerException if the argument is null
- */
- public ProcessGroup createProcessGroup(final String id) {
- return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, nifiProperties, encryptor, this, new MutableVariableRegistry(variableRegistry));
- }
-
- /**
- * <p>
- * Creates a new ProcessorNode with the given type and identifier and
- * initializes it invoking the methods annotated with {@link OnAdded}.
- * </p>
- *
- * @param type processor type
- * @param id processor id
- * @param coordinate the coordinate of the bundle for this processor
- * @return new processor
- * @throws NullPointerException if either arg is null
- * @throws ProcessorInstantiationException if the processor cannot be
- * instantiated for any reason
- */
- public ProcessorNode createProcessor(final String type, final String id, final BundleCoordinate coordinate) throws ProcessorInstantiationException {
- return createProcessor(type, id, coordinate, true);
- }
-
- /**
- * <p>
- * Creates a new ProcessorNode with the given type and identifier and
- * optionally initializes it.
- * </p>
- *
- * @param type the fully qualified Processor class name
- * @param id the unique ID of the Processor
- * @param coordinate the bundle coordinate for this processor
- * @param firstTimeAdded whether or not this is the first time this
- * Processor is added to the graph. If {@code true}, will invoke methods
- * annotated with the {@link OnAdded} annotation.
- * @return new processor node
- * @throws NullPointerException if either arg is null
- * @throws ProcessorInstantiationException if the processor cannot be
- * instantiated for any reason
- */
- public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final boolean firstTimeAdded) throws ProcessorInstantiationException {
- return createProcessor(type, id, coordinate, Collections.emptySet(), firstTimeAdded, true);
- }
-
- /**
- * <p>
- * Creates a new ProcessorNode with the given type and identifier and
- * optionally initializes it.
- * </p>
- *
- * @param type the fully qualified Processor class name
- * @param id the unique ID of the Processor
- * @param coordinate the bundle coordinate for this processor
- * @param firstTimeAdded whether or not this is the first time this
- * Processor is added to the graph. If {@code true}, will invoke methods
- * annotated with the {@link OnAdded} annotation.
- * @return new processor node
- * @throws NullPointerException if either arg is null
- * @throws ProcessorInstantiationException if the processor cannot be
- * instantiated for any reason
- */
- public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final Set<URL> additionalUrls,
- final boolean firstTimeAdded, final boolean registerLogObserver) throws ProcessorInstantiationException {
- id = id.intern();
-
- boolean creationSuccessful;
- LoggableComponent<Processor> processor;
-
- // make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
- final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
-
- try {
- processor = instantiateProcessor(type, id, coordinate, additionalUrls);
- creationSuccessful = true;
- } catch (final ProcessorInstantiationException pie) {
- LOG.error("Could not create Processor of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", pie);
- final GhostProcessor ghostProc = new GhostProcessor();
- ghostProc.setIdentifier(id);
- ghostProc.setCanonicalClassName(type);
- processor = new LoggableComponent<>(ghostProc, coordinate, null);
- creationSuccessful = false;
- }
-
- final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
- final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, componentVarRegistry);
- final ProcessorNode procNode;
- if (creationSuccessful) {
- procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider,
- nifiProperties, componentVarRegistry, this, extensionManager, validationTrigger);
- } else {
- final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
- final String componentType = "(Missing) " + simpleClassName;
- procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider,
- componentType, type, nifiProperties, componentVarRegistry, this, extensionManager, validationTrigger, true);
- }
-
- if (registerLogObserver) {
- logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
- }
-
- try {
- final Class<?> procClass = procNode.getProcessor().getClass();
- if(procClass.isAnnotationPresent(DefaultSettings.class)) {
- DefaultSettings ds = procClass.getAnnotation(DefaultSettings.class);
- try {
- procNode.setYieldPeriod(ds.yieldDuration());
- } catch(Throwable ex) {
- LOG.error(String.format("Error while setting yield period from DefaultSettings annotation:%s",ex.getMessage()),ex);
- }
- try {
- procNode.setPenalizationPeriod(ds.penaltyDuration());
- } catch(Throwable ex) {
- LOG.error(String.format("Error while setting penalty duration from DefaultSettings annotation:%s",ex.getMessage()),ex);
- }
- // calling setBulletinLevel changes the level in the LogRepository so we only want to do this when
- // the caller said to register the log observer, otherwise we could be changing the level when we didn't mean to
- if (registerLogObserver) {
- try {
- procNode.setBulletinLevel(ds.bulletinLevel());
- } catch (Throwable ex) {
- LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s", ex.getMessage()), ex);
- }
- }
- }
- } catch (Throwable ex) {
- LOG.error(String.format("Error while setting default settings from DefaultSettings annotation: %s",ex.getMessage()),ex);
- }
- if (firstTimeAdded) {
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, procNode.getProcessor());
- } catch (final Exception e) {
- if (registerLogObserver) {
- logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
- }
- throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
- }
+ public KerberosConfig createKerberosConfig(final NiFiProperties nifiProperties) {
+ final String principal = nifiProperties.getKerberosServicePrincipal();
+ final String keytabLocation = nifiProperties.getKerberosServiceKeytabLocation();
+ final File kerberosConfigFile = nifiProperties.getKerberosConfigurationFile();
- if (firstTimeAdded) {
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
- }
- }
+ if (principal == null && keytabLocation == null && kerberosConfigFile == null) {
+ return KerberosConfig.NOT_CONFIGURED;
}
- return procNode;
+ final File keytabFile = keytabLocation == null ? null : new File(keytabLocation);
+ return new KerberosConfig(principal, keytabFile, kerberosConfigFile);
}
- private LoggableComponent<Processor> instantiateProcessor(final String type, final String identifier, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
- throws ProcessorInstantiationException {
-
- final Bundle processorBundle = extensionManager.getBundle(bundleCoordinate);
- if (processorBundle == null) {
- throw new ProcessorInstantiationException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
- }
-
- final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- final ClassLoader detectedClassLoaderForInstance = extensionManager.createInstanceClassLoader(type, identifier, processorBundle, additionalUrls);
- final Class<?> rawClass = Class.forName(type, true, detectedClassLoaderForInstance);
- Thread.currentThread().setContextClassLoader(detectedClassLoaderForInstance);
-
- final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class);
- final Processor processor = processorClass.newInstance();
-
- final ComponentLog componentLogger = new SimpleProcessLogger(identifier, processor);
- final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
- final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, terminationAwareLogger, this, this, nifiProperties);
- processor.initialize(ctx);
-
- LogRepositoryFactory.getRepository(identifier).setLogger(terminationAwareLogger);
- return new LoggableComponent<>(processor, bundleCoordinate, terminationAwareLogger);
- } catch (final Throwable t) {
- throw new ProcessorInstantiationException(type, t);
- } finally {
- if (ctxClassLoader != null) {
- Thread.currentThread().setContextClassLoader(ctxClassLoader);
- }
- }
+ public ValidationTrigger getValidationTrigger() {
+ return validationTrigger;
}
- @Override
- public void reload(final ProcessorNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
- throws ProcessorInstantiationException {
- if (existingNode == null) {
- throw new IllegalStateException("Existing ProcessorNode cannot be null");
- }
-
- final String id = existingNode.getProcessor().getIdentifier();
-
- // ghost components will have a null logger
- if (existingNode.getLogger() != null) {
- existingNode.getLogger().debug("Reloading component {} to type {} from bundle {}", new Object[]{id, newType, bundleCoordinate});
- }
-
- // createProcessor will create a new instance class loader for the same id so
- // save the instance class loader to use it for calling OnRemoved on the existing processor
- final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
-
- // create a new node with firstTimeAdded as true so lifecycle methods get fired
- // attempt the creation to make sure it works before firing the OnRemoved methods below
- final ProcessorNode newNode = createProcessor(newType, id, bundleCoordinate, additionalUrls, true, false);
-
- // call OnRemoved for the existing processor using the previous instance class loader
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
- final StateManager stateManager = getStateManagerProvider().getStateManager(id);
- final StandardProcessContext processContext = new StandardProcessContext(existingNode, controllerServiceProvider, encryptor, stateManager, () -> false);
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
- } finally {
- extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
- }
-
- // set the new processor in the existing node
- final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getProcessor());
- final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
- LogRepositoryFactory.getRepository(id).setLogger(terminationAwareLogger);
-
- final LoggableComponent<Processor> newProcessor = new LoggableComponent<>(newNode.getProcessor(), newNode.getBundleCoordinate(), terminationAwareLogger);
- existingNode.setProcessor(newProcessor);
- existingNode.setExtensionMissing(newNode.isExtensionMissing());
-
- // need to refresh the properties in case we are changing from ghost component to real component
- existingNode.refreshProperties();
-
- LOG.debug("Triggering async validation of {} due to processor reload", existingNode);
- validationTrigger.triggerAsync(existingNode);
+ public StringEncryptor getEncryptor() {
+ return encryptor;
}
/**
@@ -1476,117 +1081,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return authorizer;
}
- /**
- * Creates a Port to use as an Input Port for the root Process Group, which
- * is used for Site-to-Site communications
- *
- * @param id port id
- * @param name port name
- * @return new port
- * @throws NullPointerException if the ID or name is not unique
- * @throws IllegalStateException if an Input Port already exists with the
- * same name or id.
- */
- public Port createRemoteInputPort(String id, String name) {
- id = requireNonNull(id).intern();
- name = requireNonNull(name).intern();
- verifyPortIdDoesNotExist(id);
- return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
- authorizer, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure), nifiProperties);
- }
-
- /**
- * Creates a Port to use as an Output Port for the root Process Group, which
- * is used for Site-to-Site communications and will queue flow files waiting
- * to be delivered to remote instances
- *
- * @param id port id
- * @param name port name
- * @return new port
- * @throws NullPointerException if the ID or name is not unique
- * @throws IllegalStateException if an Input Port already exists with the
- * same name or id.
- */
- public Port createRemoteOutputPort(String id, String name) {
- id = requireNonNull(id).intern();
- name = requireNonNull(name).intern();
- verifyPortIdDoesNotExist(id);
- return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
- authorizer, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure), nifiProperties);
- }
-
- /**
- * Creates a new Remote Process Group with the given ID that points to the
- * given URI
- *
- * @param id group id
- * @param uris group uris, multiple url can be specified in comma-separated format
- * @return new group
- * @throws NullPointerException if either argument is null
- * @throws IllegalArgumentException if <code>uri</code> is not a valid URI.
- */
- public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) {
- return new StandardRemoteProcessGroup(requireNonNull(id).intern(), uris, null, this, sslContext, nifiProperties);
- }
-
- public ProcessGroup getRootGroup() {
- return rootGroupRef.get();
- }
-
- /**
- * Verifies that no output port exists with the given id or name. If this
- * does not hold true, throws an IllegalStateException
- *
- * @param id port identifier
- * @throws IllegalStateException port already exists
- */
- private void verifyPortIdDoesNotExist(final String id) {
- final ProcessGroup rootGroup = getRootGroup();
- Port port = rootGroup.findOutputPort(id);
- if (port != null) {
- throw new IllegalStateException("An Input Port already exists with ID " + id);
- }
- port = rootGroup.findInputPort(id);
- if (port != null) {
- throw new IllegalStateException("An Input Port already exists with ID " + id);
- }
- }
-
- /**
- * @return the name of this controller, which is also the name of the Root
- * Group.
- */
- public String getName() {
- return getRootGroup().getName();
- }
-
- /**
- * Sets the name for the Root Group, which also changes the name for the
- * controller.
- *
- * @param name of root group
- */
- public void setName(final String name) {
- getRootGroup().setName(name);
- }
-
- /**
- * @return the comments of this controller, which is also the comment of the
- * Root Group
- */
- public String getComments() {
- return getRootGroup().getComments();
- }
-
- /**
- * Sets the comments
- *
- * @param comments for the Root Group, which also changes the comment for
- * the controller
- */
- public void setComments(final String comments) {
- getRootGroup().setComments(comments);
- }
/**
* @return <code>true</code> if the scheduling engine for this controller
@@ -1615,7 +1109,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
*/
public void shutdown(final boolean kill) {
this.shutdown = true;
- stopAllProcessors();
+ flowManager.getRootGroup().stopProcessing();
readLock.lock();
try {
@@ -1654,14 +1148,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
loadBalanceClientTasks.forEach(NioAsyncLoadBalanceClientTask::stop);
// Trigger any processors' methods marked with @OnShutdown to be called
- getRootGroup().shutdown();
+ flowManager.getRootGroup().shutdown();
stateManagerProvider.shutdown();
// invoke any methods annotated with @OnShutdown on Controller Services
- for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(
- extensionManager, serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) {
+ for (final ControllerServiceNode serviceNode : flowManager.getAllControllerServices()) {
+ final Class<?> serviceImplClass = serviceNode.getControllerServiceImplementation().getClass();
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, serviceImplClass, serviceNode.getIdentifier())) {
final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, variableRegistry);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
}
@@ -1864,11 +1358,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
- /**
- * @return the ID of the root group
- */
- public String getRootGroupId() {
- return getRootGroup().getIdentifier();
+ public UserAwareEventAccess getEventAccess() {
+ return eventAccess;
}
/**
@@ -1887,7 +1378,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
writeLock.lock();
try {
- rootGroupRef.set(group);
+ flowManager.setRootGroup(group);
for (final RemoteSiteListener listener : externalSiteListeners) {
listener.setRootGroup(group);
}
@@ -1895,7 +1386,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(group, isPrimary()));
allProcessGroups.put(group.getIdentifier(), group);
- allProcessGroups.put(ROOT_GROUP_ID_ALIAS, group);
+ allProcessGroups.put(FlowManager.ROOT_GROUP_ID_ALIAS, group);
} finally {
writeLock.unlock("setRootGroup");
}
@@ -1921,38 +1412,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
//
// ProcessGroup access
//
- /**
- * Updates the process group corresponding to the specified DTO. Any field
- * in DTO that is <code>null</code> (with the exception of the required ID)
- * will be ignored.
- *
- * @param dto group
- * @throws ProcessorInstantiationException
- *
- * @throws IllegalStateException if no process group can be found with the
- * ID of DTO or with the ID of the DTO's parentGroupId, if the template ID
- * specified is invalid, or if the DTO's Parent Group ID changes but the
- * parent group has incoming or outgoing connections
- *
- * @throws NullPointerException if the DTO or its ID is null
- */
- public void updateProcessGroup(final ProcessGroupDTO dto) throws ProcessorInstantiationException {
- final ProcessGroup group = lookupGroup(requireNonNull(dto).getId());
-
- final String name = dto.getName();
- final PositionDTO position = dto.getPosition();
- final String comments = dto.getComments();
-
- if (name != null) {
- group.setName(name);
- }
- if (position != null) {
- group.setPosition(toPosition(position));
- }
- if (comments != null) {
- group.setComments(comments);
- }
- }
private Position toPosition(final PositionDTO dto) {
return new Position(dto.getX(), dto.getY());
@@ -1961,1549 +1420,153 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
//
// Snippet
//
- /**
- * Creates an instance of the given snippet and adds the components to the
- * given group
- *
- * @param group group
- * @param dto dto
- *
- * @throws NullPointerException if either argument is null
- * @throws IllegalStateException if the snippet is not valid because a
- * component in the snippet has an ID that is not unique to this flow, or
- * because it shares an Input Port or Output Port at the root level whose
- * name already exists in the given ProcessGroup, or because the Template
- * contains a Processor or a Prioritizer whose class is not valid within
- * this instance of NiFi.
- * @throws ProcessorInstantiationException if unable to instantiate a
- * processor
- */
- public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
- instantiateSnippet(group, dto, true);
- group.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
+
+ private void verifyBundleInVersionedFlow(final org.apache.nifi.registry.flow.Bundle requiredBundle, final Set<BundleCoordinate> supportedBundles) {
+ final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
+ if (!supportedBundles.contains(requiredCoordinate)) {
+ throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
+ }
}
- private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException {
- validateSnippetContents(requireNonNull(group), dto);
- writeLock.lock();
- try {
- //
- // Instantiate Controller Services
- //
- final List<ControllerServiceNode> serviceNodes = new ArrayList<>();
- try {
- for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
- final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
- final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(), true);
- serviceNode.pauseValidationTrigger();
- serviceNodes.add(serviceNode);
-
- serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
- serviceNode.setComments(controllerServiceDTO.getComments());
- serviceNode.setName(controllerServiceDTO.getName());
- if (!topLevel) {
- serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId());
- }
- group.addControllerService(serviceNode);
+ private void verifyProcessorsInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+ if (versionedFlow.getProcessors() != null) {
+ versionedFlow.getProcessors().forEach(processor -> {
+ if (processor.getBundle() == null) {
+ throw new IllegalArgumentException("Processor bundle must be specified.");
}
- // configure controller services. We do this after creating all of them in case 1 service
- // references another service.
- for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
- final String serviceId = controllerServiceDTO.getId();
- final ControllerServiceNode serviceNode = getControllerServiceNode(serviceId);
- serviceNode.setProperties(controllerServiceDTO.getProperties());
+ if (supportedTypes.containsKey(processor.getType())) {
+ verifyBundleInVersionedFlow(processor.getBundle(), supportedTypes.get(processor.getType()));
+ } else {
+ throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
}
- } finally {
- serviceNodes.stream().forEach(ControllerServiceNode::resumeValidationTrigger);
- }
-
- //
- // Instantiate the labels
- //
- for (final LabelDTO labelDTO : dto.getLabels()) {
- final Label label = createLabel(labelDTO.getId(), labelDTO.getLabel());
- label.setPosition(toPosition(labelDTO.getPosition()));
- if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) {
- label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight()));
- }
-
- label.setStyle(labelDTO.getStyle());
- if (!topLevel) {
- label.setVersionedComponentId(labelDTO.getVersionedComponentId());
- }
-
- group.addLabel(label);
- }
-
- // Instantiate the funnels
- for (final FunnelDTO funnelDTO : dto.getFunnels()) {
- final Funnel funnel = createFunnel(funnelDTO.getId());
- funnel.setPosition(toPosition(funnelDTO.getPosition()));
- if (!topLevel) {
- funnel.setVersionedComponentId(funnelDTO.getVersionedComponentId());
- }
-
- group.addFunnel(funnel);
- }
-
- //
- // Instantiate Input Ports & Output Ports
- //
- for (final PortDTO portDTO : dto.getInputPorts()) {
- final Port inputPort;
- if (group.isRootGroup()) {
- inputPort = createRemoteInputPort(portDTO.getId(), portDTO.getName());
- inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
- if (portDTO.getGroupAccessControl() != null) {
- ((RootGroupPort) inputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
- }
- if (portDTO.getUserAccessControl() != null) {
- ((RootGroupPort) inputPort).setUserAccessControl(portDTO.getUserAccessControl());
- }
- } else {
- inputPort = createLocalInputPort(portDTO.getId(), portDTO.getName());
- }
-
- if (!topLevel) {
- inputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
- }
- inputPort.setPosition(toPosition(portDTO.getPosition()));
- inputPort.setProcessGroup(group);
- inputPort.setComments(portDTO.getComments());
- group.addInputPort(inputPort);
- }
-
- for (final PortDTO portDTO : dto.getOutputPorts()) {
- final Port outputPort;
- if (group.isRootGroup()) {
- outputPort = createRemoteOutputPort(portDTO.getId(), portDTO.getName());
- outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
- if (portDTO.getGroupAccessControl() != null) {
- ((RootGroupPort) outputPort).setGroupAccessControl(portDTO.getGroupAccessControl());
- }
- if (portDTO.getUserAccessControl() != null) {
- ((RootGroupPort) outputPort).setUserAccessControl(portDTO.getUserAccessControl());
- }
- } else {
- outputPort = createLocalOutputPort(portDTO.getId(), portDTO.getName());
- }
-
- if (!topLevel) {
- outputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
- }
- outputPort.setPosition(toPosition(portDTO.getPosition()));
- outputPort.setProcessGroup(group);
- outputPort.setComments(portDTO.getComments());
- group.addOutputPort(outputPort);
- }
-
- //
- // Instantiate the processors
- //
- for (final ProcessorDTO processorDTO : dto.getProcessors()) {
- final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, processorDTO.getType(), processorDTO.getBundle());
- final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate);
- procNode.pauseValidationTrigger();
-
- try {
- procNode.setPosition(toPosition(processorDTO.getPosition()));
- procNode.setProcessGroup(group);
- if (!topLevel) {
- procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
- }
-
- final ProcessorConfigDTO config = processorDTO.getConfig();
- procNode.setComments(config.getComments());
- if (config.isLossTolerant() != null) {
- procNode.setLossTolerant(config.isLossTolerant());
- }
- procNode.setName(processorDTO.getName());
-
- procNode.setYieldPeriod(config.getYieldDuration());
- procNode.setPenalizationPeriod(config.getPenaltyDuration());
- procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
- procNode.setAnnotationData(config.getAnnotationData());
- procNode.setStyle(processorDTO.getStyle());
-
- if (config.getRunDurationMillis() != null) {
- procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS);
- }
-
- if (config.getSchedulingStrategy() != null) {
- procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
- }
-
- if (config.getExecutionNode() != null) {
- procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
- }
-
- if (processorDTO.getState().equals(ScheduledState.DISABLED.toString())) {
- procNode.disable();
- }
-
- // ensure that the scheduling strategy is set prior to these values
- procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
- procNode.setScheduldingPeriod(config.getSchedulingPeriod());
-
- final Set<Relationship> relationships = new HashSet<>();
- if (processorDTO.getRelationships() != null) {
- for (final RelationshipDTO rel : processorDTO.getRelationships()) {
- if (rel.isAutoTerminate()) {
- relationships.add(procNode.getRelationship(rel.getName()));
- }
- }
- procNode.setAutoTerminatedRelationships(relationships);
- }
-
- if (config.getProperties() != null) {
- procNode.setProperties(config.getProperties());
- }
-
- group.addProcessor(procNode);
- } finally {
- procNode.resumeValidationTrigger();
- }
- }
-
- //
- // Instantiate Remote Process Groups
- //
- for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) {
- final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUris());
- remoteGroup.setComments(remoteGroupDTO.getComments());
- remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
- remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
- remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration());
- if (!topLevel) {
- remoteGroup.setVersionedComponentId(remoteGroupDTO.getVersionedComponentId());
- }
-
- if (remoteGroupDTO.getTransportProtocol() == null) {
- remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.RAW);
- } else {
- remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(remoteGroupDTO.getTransportProtocol()));
- }
-
- remoteGroup.setProxyHost(remoteGroupDTO.getProxyHost());
- remoteGroup.setProxyPort(remoteGroupDTO.getProxyPort());
- remoteGroup.setProxyUser(remoteGroupDTO.getProxyUser());
- remoteGroup.setProxyPassword(remoteGroupDTO.getProxyPassword());
- remoteGroup.setProcessGroup(group);
-
- // set the input/output ports
- if (remoteGroupDTO.getContents() != null) {
- final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents();
-
- // ensure there are input ports
- if (contents.getInputPorts() != null) {
- remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()), false);
- }
-
- // ensure there are output ports
- if (contents.getOutputPorts() != null) {
- remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()), false);
- }
- }
-
- group.addRemoteProcessGroup(remoteGroup);
- }
-
- //
- // Instantiate ProcessGroups
- //
- for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) {
- final ProcessGroup childGroup = createProcessGroup(groupDTO.getId());
- childGroup.setParent(group);
- childGroup.setPosition(toPosition(groupDTO.getPosition()));
- childGroup.setComments(groupDTO.getComments());
- childGroup.setName(groupDTO.getName());
- if (groupDTO.getVariables() != null) {
- childGroup.setVariables(groupDTO.getVariables());
- }
-
- // If this Process Group is 'top level' then we do not set versioned component ID's.
- // We do this only if this component is the child of a Versioned Component.
- if (!topLevel) {
- childGroup.setVersionedComponentId(groupDTO.getVersionedComponentId());
- }
-
- group.addProcessGroup(childGroup);
-
- final FlowSnippetDTO contents = groupDTO.getContents();
-
- // we want this to be recursive, so we will create a new template that contains only
- // the contents of this child group and recursively call ourselves.
- final FlowSnippetDTO childTemplateDTO = new FlowSnippetDTO();
- childTemplateDTO.setConnections(contents.getConnections());
- childTemplateDTO.setInputPorts(contents.getInputPorts());
- childTemplateDTO.setLabels(contents.getLabels());
- childTemplateDTO.setOutputPorts(contents.getOutputPorts());
- childTemplateDTO.setProcessGroups(contents.getProcessGroups());
- childTemplateDTO.setProcessors(contents.getProcessors());
- childTemplateDTO.setFunnels(contents.getFunnels());
- childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups());
- childTemplateDTO.setControllerServices(contents.getControllerServices());
- instantiateSnippet(childGroup, childTemplateDTO, false);
-
- if (groupDTO.getVersionControlInformation() != null) {
- final VersionControlInformation vci = StandardVersionControlInformation.Builder
- .fromDto(groupDTO.getVersionControlInformation())
- .build();
- childGroup.setVersionControlInformation(vci, Collections.emptyMap());
- }
- }
-
- //
- // Instantiate Connections
- //
- for (final ConnectionDTO connectionDTO : dto.getConnections()) {
- final ConnectableDTO sourceDTO = connectionDTO.getSource();
- final ConnectableDTO destinationDTO = connectionDTO.getDestination();
- final Connectable source;
- final Connectable destination;
-
- // locate the source and destination connectable. if this is a remote port
- // we need to locate the remote process groups. otherwise we need to
- // find the connectable given its parent group.
- // NOTE: (getConnectable returns ANY connectable, when the parent is
- // not this group only input ports or output ports should be returned. if something
- // other than a port is returned, an exception will be thrown when adding the
- // connection below.)
- // see if the source connectable is a remote port
- if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) {
- final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(sourceDTO.getGroupId());
- source = remoteGroup.getOutputPort(sourceDTO.getId());
- } else {
- final ProcessGroup sourceGroup = getConnectableParent(group, sourceDTO.getGroupId());
- source = sourceGroup.getConnectable(sourceDTO.getId());
- }
-
- // see if the destination connectable is a remote port
- if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDTO.getType())) {
- final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(destinationDTO.getGroupId());
- destination = remoteGroup.getInputPort(destinationDTO.getId());
- } else {
- final ProcessGroup destinationGroup = getConnectableParent(group, destinationDTO.getGroupId());
- destination = destinationGroup.getConnectable(destinationDTO.getId());
- }
-
- // determine the selection relationships for this connection
- final Set<String> relationships = new HashSet<>();
- if (connectionDTO.getSelectedRelationships() != null) {
- relationships.addAll(connectionDTO.getSelectedRelationships());
- }
-
- final Connection connection = createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships);
- if (!topLevel) {
- connection.setVersionedComponentId(connectionDTO.getVersionedComponentId());
- }
-
- if (connectionDTO.getBends() != null) {
- final List<Position> bendPoints = new ArrayList<>();
- for (final PositionDTO bend : connectionDTO.getBends()) {
- bendPoints.add(new Position(bend.getX(), bend.getY()));
- }
- connection.setBendPoints(bendPoints);
- }
-
- final FlowFileQueue queue = connection.getFlowFileQueue();
- queue.setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold());
- queue.setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold());
- queue.setFlowFileExpiration(connectionDTO.getFlowFileExpiration());
-
- final List<String> prioritizers = connectionDTO.getPrioritizers();
- if (prioritizers != null) {
- final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers);
- final List<FlowFilePrioritizer> newPrioritizers = new ArrayList<>();
- for (final String className : newPrioritizersClasses) {
- try {
- newPrioritizers.add(createPrioritizer(className));
- } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
- throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e);
- }
- }
- queue.setPriorities(newPrioritizers);
- }
-
- final String loadBalanceStrategyName = connectionDTO.getLoadBalanceStrategy();
- if (loadBalanceStrategyName != null) {
- final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
- final String partitioningAttribute = connectionDTO.getLoadBalancePartitionAttribute();
- queue.setLoadBalanceStrategy(loadBalanceStrategy, partitioningAttribute);
- }
-
- connection.setProcessGroup(group);
- group.addConnection(connection);
- }
- } finally {
- writeLock.unlock("instantiateSnippet");
- }
- }
-
- /**
- * Converts a set of ports into a set of remote process group ports.
- *
- * @param ports ports
- * @return group descriptors
- */
- private Set<RemoteProcessGroupPortDescriptor> convertRemotePort(final Set<RemoteProcessGroupPortDTO> ports) {
- Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
- if (ports != null) {
- remotePorts = new LinkedHashSet<>(ports.size());
- for (final RemoteProcessGroupPortDTO port : ports) {
- final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
- descriptor.setId(port.getId());
- descriptor.setVersionedComponentId(port.getVersionedComponentId());
- descriptor.setTargetId(port.getTargetId());
- descriptor.setName(port.getName());
- descriptor.setComments(port.getComments());
- descriptor.setTargetRunning(port.isTargetRunning());
- descriptor.setConnected(port.isConnected());
- descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount());
- descriptor.setTransmitting(port.isTransmitting());
- descriptor.setUseCompression(port.getUseCompression());
- final BatchSettingsDTO batchSettings = port.getBatchSettings();
- if (batchSettings != null) {
- descriptor.setBatchCount(batchSettings.getCount());
- descriptor.setBatchSize(batchSettings.getSize());
- descriptor.setBatchDuration(batchSettings.getDuration());
- }
- remotePorts.add(descriptor);
- }
- }
- return remotePorts;
- }
-
- /**
- * Returns the parent of the specified Connectable. This only considers this
- * group and any direct child sub groups.
- *
- * @param parentGroupId group id
- * @return parent group
- */
- private ProcessGroup getConnectableParent(final ProcessGroup group, final String parentGroupId) {
- if (areGroupsSame(group.getIdentifier(), parentGroupId)) {
- return group;
- } else {
- return group.getProcessGroup(parentGroupId);
- }
- }
-
- private void verifyBundleInSnippet(final BundleDTO requiredBundle, final Set<BundleCoordinate> supportedBundles) {
- final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
- if (!supportedBundles.contains(requiredCoordinate)) {
- throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
- }
- }
-
- private void verifyBundleInVersionedFlow(final org.apache.nifi.registry.flow.Bundle requiredBundle, final Set<BundleCoordinate> supportedBundles) {
- final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
- if (!supportedBundles.contains(requiredCoordinate)) {
- throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
- }
- }
-
- private void verifyProcessorsInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
- if (templateContents.getProcessors() != null) {
- templateContents.getProcessors().forEach(processor -> {
- if (processor.getBundle() == null) {
- throw new IllegalArgumentException("Processor bundle must be specified.");
- }
-
- if (supportedTypes.containsKey(processor.getType())) {
- verifyBundleInSnippet(processor.getBundle(), supportedTypes.get(processor.getType()));
- } else {
- throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
- }
- });
- }
-
- if (templateContents.getProcessGroups() != null) {
- templateContents.getProcessGroups().forEach(processGroup -> {
- verifyProcessorsInSnippet(processGroup.getContents(), supportedTypes);
- });
- }
- }
-
- private void verifyProcessorsInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
- if (versionedFlow.getProcessors() != null) {
- versionedFlow.getProcessors().forEach(processor -> {
- if (processor.getBundle() == null) {
- throw new IllegalArgumentException("Processor bundle must be specified.");
- }
-
- if (supportedTypes.containsKey(processor.getType())) {
- verifyBundleInVersionedFlow(processor.getBundle(), supportedTypes.get(processor.getType()));
- } else {
- throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
- }
- });
- }
-
- if (versionedFlow.getProcessGroups() != null) {
- versionedFlow.getProcessGroups().forEach(processGroup -> {
- verifyProcessorsInVersionedFlow(processGroup, supportedTypes);
- });
- }
- }
-
- private void verifyControllerServicesInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
- if (templateContents.getControllerServices() != null) {
- templateContents.getControllerServices().forEach(controllerService -> {
- if (supportedTypes.containsKey(controllerService.getType())) {
- if (controllerService.getBundle() == null) {
- throw new IllegalArgumentException("Controller Service bundle must be specified.");
- }
-
- verifyBundleInSnippet(controllerService.getBundle(), supportedTypes.get(controllerService.getType()));
- } else {
- throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
- }
- });
- }
-
- if (templateContents.getProcessGroups() != null) {
- templateContents.getProcessGroups().forEach(processGroup -> {
- verifyControllerServicesInSnippet(processGroup.getContents(), supportedTypes);
- });
- }
- }
-
- private void verifyControllerServicesInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
- if (versionedFlow.getControllerServices() != null) {
- versionedFlow.getControllerServices().forEach(controllerService -> {
- if (supportedTypes.containsKey(controllerService.getType())) {
- if (controllerService.getBundle() == null) {
- throw new IllegalArgumentException("Controller Service bundle must be specified.");
- }
-
- verifyBundleInVersionedFlow(controllerService.getBundle(), supportedTypes.get(controllerService.getType()));
- } else {
- throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
- }
- });
- }
-
- if (versionedFlow.getProcessGroups() != null) {
- versionedFlow.getProcessGroups().forEach(processGroup -> {
- verifyControllerServicesInVersionedFlow(processGroup, supportedTypes);
- });
- }
- }
-
- public void verifyComponentTypesInSnippet(final FlowSnippetDTO templateContents) {
- final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
- for (final Class<?> c : extensionManager.getExtensions(Processor.class)) {
- final String name = c.getName();
- processorClasses.put(name, extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
- }
- verifyProcessorsInSnippet(templateContents, processorClasses);
-
- final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>();
- for (final Class<?> c : extensionManager.getExtensions(ControllerService.class)) {
- final String name = c.getName();
- controllerServiceClasses.put(name, extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
- }
- verifyControllerServicesInSnippet(templateContents, controllerServiceClasses);
-
- final Set<String> prioritizerClasses = new HashSet<>();
- for (final Class<?> c : extensionManager.getExtensions(FlowFilePrioritizer.class)) {
- prioritizerClasses.add(c.ge
<TRUNCATED>