You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/11/01 18:41:44 UTC
[6/7] nifi git commit: NIFI-5673 Set up property/assembly for new
auto-load directory - Set up NarAutoLoader to watch directory for new files -
Move NarAutoLoader to JettyServer since it will need access to
ExtensionManager - Created NarLoader to shared
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 69bdf2a..680962e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -441,7 +441,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final StringEncryptor encryptor,
final BulletinRepository bulletinRepo,
final VariableRegistry variableRegistry,
- final FlowRegistryClient flowRegistryClient) {
+ final FlowRegistryClient flowRegistryClient,
+ final ExtensionManager extensionManager) {
return new FlowController(
flowFileEventRepo,
@@ -456,7 +457,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/* heartbeat monitor */ null,
/* leader election manager */ null,
/* variable registry */ variableRegistry,
- flowRegistryClient);
+ flowRegistryClient,
+ extensionManager);
}
public static FlowController createClusteredInstance(
@@ -471,7 +473,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final HeartbeatMonitor heartbeatMonitor,
final LeaderElectionManager leaderElectionManager,
final VariableRegistry variableRegistry,
- final FlowRegistryClient flowRegistryClient) {
+ final FlowRegistryClient flowRegistryClient,
+ final ExtensionManager extensionManager) {
final FlowController flowController = new FlowController(
flowFileEventRepo,
@@ -486,7 +489,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
heartbeatMonitor,
leaderElectionManager,
variableRegistry,
- flowRegistryClient);
+ flowRegistryClient,
+ extensionManager);
return flowController;
}
@@ -505,7 +509,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final HeartbeatMonitor heartbeatMonitor,
final LeaderElectionManager leaderElectionManager,
final VariableRegistry variableRegistry,
- final FlowRegistryClient flowRegistryClient) {
+ final FlowRegistryClient flowRegistryClient,
+ final ExtensionManager extensionManager) {
maxTimerDrivenThreads = new AtomicInteger(10);
maxEventDrivenThreads = new AtomicInteger(5);
@@ -513,14 +518,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.encryptor = encryptor;
this.nifiProperties = nifiProperties;
this.heartbeatMonitor = heartbeatMonitor;
- sslContext = SslContextFactory.createSslContext(nifiProperties);
- extensionManager = new ExtensionManager();
+ this.sslContext = SslContextFactory.createSslContext(nifiProperties);
+ this.extensionManager = extensionManager;
this.clusterCoordinator = clusterCoordinator;
timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
- final FlowFileRepository flowFileRepo = createFlowFileRepository(nifiProperties, resourceClaimManager);
+ final FlowFileRepository flowFileRepo = createFlowFileRepository(nifiProperties, extensionManager, resourceClaimManager);
flowFileRepository = flowFileRepo;
flowFileEventRepository = flowFileEventRepo;
counterRepositoryRef = new AtomicReference<>(new StandardCounterRepository());
@@ -542,7 +547,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
try {
- this.stateManagerProvider = StandardStateManagerProvider.create(nifiProperties, this.variableRegistry);
+ this.stateManagerProvider = StandardStateManagerProvider.create(nifiProperties, this.variableRegistry, extensionManager);
} catch (final IOException e) {
throw new RuntimeException(e);
}
@@ -553,7 +558,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
- eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor);
+ eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
@@ -743,7 +748,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return ResourceFactory.getControllerResource();
}
- private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) {
+ private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ExtensionManager extensionManager, final ResourceClaimManager contentClaimManager) {
final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
@@ -751,7 +756,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
try {
- final FlowFileRepository created = NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileRepository.class, properties);
+ final FlowFileRepository created = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, FlowFileRepository.class, properties);
synchronized (created) {
created.initialize(contentClaimManager);
}
@@ -761,14 +766,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
- private static FlowFileSwapManager createSwapManager(final NiFiProperties properties) {
+ private static FlowFileSwapManager createSwapManager(final NiFiProperties properties, final ExtensionManager extensionManager) {
final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, DEFAULT_SWAP_MANAGER_IMPLEMENTATION);
if (implementationClassName == null) {
return null;
}
try {
- return NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileSwapManager.class, properties);
+ return NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, FlowFileSwapManager.class, properties);
} catch (final Exception e) {
throw new RuntimeException(e);
}
@@ -876,7 +881,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private void notifyComponentsConfigurationRestored() {
for (final ProcessorNode procNode : getGroup(getRootGroupId()).findAllProcessors()) {
final Processor processor = procNode.getProcessor();
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
}
}
@@ -884,7 +889,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
final ControllerService service = serviceNode.getControllerServiceImplementation();
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass(), service.getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
}
}
@@ -892,7 +897,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
final ReportingTask task = taskNode.getReportingTask();
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(task.getClass(), task.getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, task.getClass(), task.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, task);
}
}
@@ -1020,7 +1025,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
try {
- final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class, properties);
+ final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, ContentRepository.class, properties);
synchronized (contentRepo) {
contentRepo.initialize(resourceClaimManager);
}
@@ -1038,7 +1043,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
try {
- return NarThreadContextClassLoader.createInstance(implementationClassName, ProvenanceRepository.class, properties);
+ return NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, ProvenanceRepository.class, properties);
} catch (final Exception e) {
throw new RuntimeException(e);
}
@@ -1052,7 +1057,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
try {
- return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class, nifiProperties);
+ return NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, ComponentStatusRepository.class, nifiProperties);
} catch (final Exception e) {
throw new RuntimeException(e);
}
@@ -1083,7 +1088,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// Create and initialize a FlowFileSwapManager for this connection
- final FlowFileSwapManager swapManager = createSwapManager(nifiProperties);
+ final FlowFileSwapManager swapManager = createSwapManager(nifiProperties, extensionManager);
final EventReporter eventReporter = createEventReporter(getBulletinRepository());
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
@@ -1288,12 +1293,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ProcessorNode procNode;
if (creationSuccessful) {
procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider,
- nifiProperties, componentVarRegistry, this, validationTrigger);
+ nifiProperties, componentVarRegistry, this, extensionManager, validationTrigger);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider,
- componentType, type, nifiProperties, componentVarRegistry, this, validationTrigger, true);
+ componentType, type, nifiProperties, componentVarRegistry, this, extensionManager, validationTrigger, true);
}
if (registerLogObserver) {
@@ -1331,7 +1336,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
if (firstTimeAdded) {
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, procNode.getProcessor());
} catch (final Exception e) {
if (registerLogObserver) {
@@ -1341,7 +1346,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
if (firstTimeAdded) {
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
}
}
@@ -1353,14 +1358,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private LoggableComponent<Processor> instantiateProcessor(final String type, final String identifier, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
throws ProcessorInstantiationException {
- final Bundle processorBundle = ExtensionManager.getBundle(bundleCoordinate);
+ final Bundle processorBundle = extensionManager.getBundle(bundleCoordinate);
if (processorBundle == null) {
throw new ProcessorInstantiationException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
}
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
- final ClassLoader detectedClassLoaderForInstance = ExtensionManager.createInstanceClassLoader(type, identifier, processorBundle, additionalUrls);
+ final ClassLoader detectedClassLoaderForInstance = extensionManager.createInstanceClassLoader(type, identifier, processorBundle, additionalUrls);
final Class<?> rawClass = Class.forName(type, true, detectedClassLoaderForInstance);
Thread.currentThread().setContextClassLoader(detectedClassLoaderForInstance);
@@ -1400,7 +1405,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// createProcessor will create a new instance class loader for the same id so
// save the instance class loader to use it for calling OnRemoved on the existing processor
- final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
+ final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
// create a new node with firstTimeAdded as true so lifecycle methods get fired
// attempt the creation to make sure it works before firing the OnRemoved methods below
@@ -1412,7 +1417,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final StandardProcessContext processContext = new StandardProcessContext(existingNode, controllerServiceProvider, encryptor, stateManager, () -> false);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
} finally {
- ExtensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
+ extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
}
// set the new processor in the existing node
@@ -1655,7 +1660,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// invoke any methods annotated with @OnShutdown on Controller Services
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(
+ extensionManager, serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) {
final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, variableRegistry);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
}
@@ -1664,7 +1670,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// invoke any methods annotated with @OnShutdown on Reporting Tasks
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
final ConfigurationContext configContext = taskNode.getConfigurationContext();
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, taskNode.getReportingTask(), configContext);
}
}
@@ -1987,7 +1993,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final List<ControllerServiceNode> serviceNodes = new ArrayList<>();
try {
for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
- final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
+ final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(), true);
serviceNode.pauseValidationTrigger();
serviceNodes.add(serviceNode);
@@ -2097,7 +2103,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Instantiate the processors
//
for (final ProcessorDTO processorDTO : dto.getProcessors()) {
- final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(processorDTO.getType(), processorDTO.getBundle());
+ final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, processorDTO.getType(), processorDTO.getBundle());
final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate);
procNode.pauseValidationTrigger();
@@ -2489,21 +2495,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public void verifyComponentTypesInSnippet(final FlowSnippetDTO templateContents) {
final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
- for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
+ for (final Class<?> c : extensionManager.getExtensions(Processor.class)) {
final String name = c.getName();
- processorClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+ processorClasses.put(name, extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
}
verifyProcessorsInSnippet(templateContents, processorClasses);
final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>();
- for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) {
+ for (final Class<?> c : extensionManager.getExtensions(ControllerService.class)) {
final String name = c.getName();
- controllerServiceClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+ controllerServiceClasses.put(name, extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
}
verifyControllerServicesInSnippet(templateContents, controllerServiceClasses);
final Set<String> prioritizerClasses = new HashSet<>();
- for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
+ for (final Class<?> c : extensionManager.getExtensions(FlowFilePrioritizer.class)) {
prioritizerClasses.add(c.getName());
}
@@ -2527,21 +2533,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public void verifyComponentTypesInSnippet(final VersionedProcessGroup versionedFlow) {
final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
- for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
+ for (final Class<?> c : extensionManager.getExtensions(Processor.class)) {
final String name = c.getName();
- processorClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+ processorClasses.put(name, extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
}
verifyProcessorsInVersionedFlow(versionedFlow, processorClasses);
final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>();
- for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) {
+ for (final Class<?> c : extensionManager.getExtensions(ControllerService.class)) {
final String name = c.getName();
- controllerServiceClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+ controllerServiceClasses.put(name, extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
}
verifyControllerServicesInVersionedFlow(versionedFlow, controllerServiceClasses);
final Set<String> prioritizerClasses = new HashSet<>();
- for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
+ for (final Class<?> c : extensionManager.getExtensions(FlowFilePrioritizer.class)) {
prioritizerClasses.add(c.getName());
}
@@ -2666,7 +2672,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
- final List<Bundle> prioritizerBundles = ExtensionManager.getBundles(type);
+ final List<Bundle> prioritizerBundles = extensionManager.getBundles(type);
if (prioritizerBundles.size() == 0) {
throw new IllegalStateException(String.format("The specified class '%s' is not known to this nifi.", type));
}
@@ -2754,12 +2760,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
//
@SuppressWarnings("rawtypes")
public Set<Class> getFlowFileProcessorClasses() {
- return ExtensionManager.getExtensions(Processor.class);
+ return extensionManager.getExtensions(Processor.class);
}
@SuppressWarnings("rawtypes")
public Set<Class> getFlowFileComparatorClasses() {
- return ExtensionManager.getExtensions(FlowFilePrioritizer.class);
+ return extensionManager.getExtensions(FlowFilePrioritizer.class);
}
/**
@@ -3677,12 +3683,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, componentVarRegistry);
final ReportingTaskNode taskNode;
if (creationSuccessful) {
- taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentVarRegistry, this, validationTrigger);
+ taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentVarRegistry,
+ this, extensionManager, validationTrigger);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
- taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, componentVarRegistry, this, validationTrigger, true);
+ taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, componentVarRegistry,
+ this, extensionManager, validationTrigger, true);
}
taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName());
@@ -3697,7 +3705,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie);
}
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getReportingTask().getIdentifier())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, taskNode.getReportingTask().getClass(), taskNode.getReportingTask().getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask());
} catch (final Exception e) {
@@ -3721,12 +3729,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
- final Bundle reportingTaskBundle = ExtensionManager.getBundle(bundleCoordinate);
+ final Bundle reportingTaskBundle = extensionManager.getBundle(bundleCoordinate);
if (reportingTaskBundle == null) {
throw new IllegalStateException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
}
- final ClassLoader detectedClassLoader = ExtensionManager.createInstanceClassLoader(type, id, reportingTaskBundle, additionalUrls);
+ final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, id, reportingTaskBundle, additionalUrls);
final Class<?> rawClass = Class.forName(type, false, detectedClassLoader);
Thread.currentThread().setContextClassLoader(detectedClassLoader);
@@ -3763,7 +3771,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// createReportingTask will create a new instance class loader for the same id so
// save the instance class loader to use it for calling OnRemoved on the existing processor
- final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
+ final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
// set firstTimeAdded to true so lifecycle annotations get fired, but don't register this node
// attempt the creation to make sure it works before firing the OnRemoved methods below
@@ -3773,7 +3781,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getReportingTask(), existingNode.getConfigurationContext());
} finally {
- ExtensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
+ extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
}
// set the new reporting task into the existing node
@@ -3828,7 +3836,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
reportingTaskNode.verifyCanDelete();
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getReportingTask().getIdentifier())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getReportingTask().getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
}
@@ -3849,7 +3857,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
LogRepositoryFactory.removeRepository(reportingTaskNode.getIdentifier());
processScheduler.onReportingTaskRemoved(reportingTaskNode);
- ExtensionManager.removeInstanceClassLoader(reportingTaskNode.getIdentifier());
+ extensionManager.removeInstanceClassLoader(reportingTaskNode.getIdentifier());
}
@Override
@@ -3875,7 +3883,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (firstTimeAdded) {
final ControllerService service = serviceNode.getControllerServiceImplementation();
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass(), service.getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
}
}
@@ -3899,7 +3907,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// createControllerService will create a new instance class loader for the same id so
// save the instance class loader to use it for calling OnRemoved on the existing service
- final ClassLoader existingInstanceClassLoader = ExtensionManager.getInstanceClassLoader(id);
+ final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
// create a new node with firstTimeAdded as true so lifecycle methods get called
// attempt the creation to make sure it works before firing the OnRemoved methods below
@@ -3910,7 +3918,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ConfigurationContext configurationContext = new StandardConfigurationContext(existingNode, controllerServiceProvider, null, variableRegistry);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getControllerServiceImplementation(), configurationContext);
} finally {
- ExtensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
+ extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
}
// take the invocation handler that was created for new proxy and is set to look at the new node,
@@ -4059,7 +4067,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
service.verifyCanDelete();
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass(), service.getIdentifier())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, service.getControllerServiceImplementation().getClass(), service.getIdentifier())) {
final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null, variableRegistry);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext);
}
@@ -4080,7 +4088,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
rootControllerServices.remove(service.getIdentifier());
getStateManagerProvider().onComponentRemoved(service.getIdentifier());
- ExtensionManager.removeInstanceClassLoader(service.getIdentifier());
+ extensionManager.removeInstanceClassLoader(service.getIdentifier());
LOG.info("{} removed from Flow Controller", service, this);
}
@@ -4508,17 +4516,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
final ProcessGroup rootGroup = getGroup(getRootGroupId());
for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
}
}
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
}
}
for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getIdentifier())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
}
}
@@ -4920,7 +4928,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
@SuppressWarnings("rawtypes")
public List<String> getComponentTypes() {
- final Set<Class> procClasses = ExtensionManager.getExtensions(Processor.class);
+ final Set<Class> procClasses = extensionManager.getExtensions(Processor.class);
final List<String> componentTypes = new ArrayList<>(procClasses.size() + 2);
componentTypes.add(ProvenanceEventRecord.REMOTE_INPUT_PORT_TYPE);
componentTypes.add(ProvenanceEventRecord.REMOTE_OUTPUT_PORT_TYPE);
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 957357b..a34d00b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -56,7 +56,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.lifecycle.LifeCycleStartException;
import org.apache.nifi.logging.LogLevel;
-import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.persistence.FlowConfigurationDAO;
import org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO;
import org.apache.nifi.persistence.TemplateDeserializer;
@@ -190,7 +190,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS);
autoResumeState = nifiProperties.getAutoResumeState();
- dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor, nifiProperties);
+ dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor, nifiProperties, controller.getExtensionManager());
this.clusterCoordinator = clusterCoordinator;
if (clusterCoordinator != null) {
clusterCoordinator.setFlowService(this);
@@ -1057,7 +1057,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
public void run() {
ClassLoader currentCl = null;
- final Bundle frameworkBundle = NarClassLoaders.getInstance().getFrameworkBundle();
+ final Bundle frameworkBundle = NarClassLoadersHolder.getInstance().getFrameworkBundle();
if (frameworkBundle != null) {
currentCl = Thread.currentThread().getContextClassLoader();
final ClassLoader cl = frameworkBundle.getClassLoader();
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index d47e198..279daf3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -58,6 +58,7 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.flow.FlowRegistry;
@@ -137,11 +138,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
private final StringEncryptor encryptor;
private final boolean autoResumeState;
private final NiFiProperties nifiProperties;
+ private final ExtensionManager extensionManager;
- public StandardFlowSynchronizer(final StringEncryptor encryptor, final NiFiProperties nifiProperties) {
+ public StandardFlowSynchronizer(final StringEncryptor encryptor, final NiFiProperties nifiProperties, final ExtensionManager extensionManager) {
this.encryptor = encryptor;
- autoResumeState = nifiProperties.getAutoResumeState();
+ this.autoResumeState = nifiProperties.getAutoResumeState();
this.nifiProperties = nifiProperties;
+ this.extensionManager = extensionManager;
}
public static boolean isEmpty(final DataFlow dataFlow) {
@@ -490,7 +493,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
if (!withinTemplate(componentElement)) {
final String componentType = DomUtils.getChildText(componentElement, "class");
try {
- BundleUtils.getBundle(componentType, FlowFromDOMFactory.getBundle(bundleElement));
+ BundleUtils.getBundle(extensionManager, componentType, FlowFromDOMFactory.getBundle(bundleElement));
} catch (IllegalStateException e) {
throw new MissingBundleException(e.getMessage(), e);
}
@@ -644,7 +647,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
if (!controllerInitialized || existingFlowEmpty) {
BundleCoordinate coordinate;
try {
- coordinate = BundleUtils.getCompatibleBundle(dto.getType(), dto.getBundle());
+ coordinate = BundleUtils.getCompatibleBundle(extensionManager, dto.getType(), dto.getBundle());
} catch (final IllegalStateException e) {
final BundleDTO bundleDTO = dto.getBundle();
if (bundleDTO == null) {
@@ -1222,7 +1225,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
BundleCoordinate coordinate;
try {
- coordinate = BundleUtils.getCompatibleBundle(processorDTO.getType(), processorDTO.getBundle());
+ coordinate = BundleUtils.getCompatibleBundle(extensionManager, processorDTO.getType(), processorDTO.getBundle());
} catch (final IllegalStateException e) {
final BundleDTO bundleDTO = processorDTO.getBundle();
if (bundleDTO == null) {
@@ -1651,7 +1654,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
// check if the Flow is inheritable
- final FingerprintFactory fingerprintFactory = new FingerprintFactory(encryptor);
+ final FingerprintFactory fingerprintFactory = new FingerprintFactory(encryptor, extensionManager);
final String existingFlowFingerprintBeforeHash = fingerprintFactory.createFingerprint(existingFlow, controller);
if (existingFlowFingerprintBeforeHash.trim().isEmpty()) {
return null; // no existing flow, so equivalent to proposed flow
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 2cee3d4..8ab1d6f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -82,6 +82,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
@@ -152,20 +153,22 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider, final NiFiProperties nifiProperties,
- final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger) {
+ final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ExtensionManager extensionManager,
+ final ValidationTrigger validationTrigger) {
this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider, processor.getComponent().getClass().getSimpleName(),
- processor.getComponent().getClass().getCanonicalName(), nifiProperties, variableRegistry, reloadComponent, validationTrigger, false);
+ processor.getComponent().getClass().getCanonicalName(), nifiProperties, variableRegistry, reloadComponent, extensionManager, validationTrigger, false);
}
public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider,
final String componentType, final String componentCanonicalClass, final NiFiProperties nifiProperties,
- final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger,
- final boolean isExtensionMissing) {
+ final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ExtensionManager extensionManager,
+ final ValidationTrigger validationTrigger, final boolean isExtensionMissing) {
- super(uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, validationTrigger, isExtensionMissing);
+ super(uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent,
+ extensionManager, validationTrigger, isExtensionMissing);
final ProcessorDetails processorDetails = new ProcessorDetails(processor);
this.processorRef = new AtomicReference<>(processorDetails);
@@ -916,7 +919,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final Set<Relationship> relationships;
final Processor processor = processorRef.get().getProcessor();
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
relationships = processor.getRelationships();
}
@@ -983,7 +986,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final Set<Relationship> undefined = new HashSet<>();
final Set<Relationship> relationships;
final Processor processor = processorRef.get().getProcessor();
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
relationships = processor.getRelationships();
}
@@ -1131,7 +1134,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public Collection<Relationship> getRelationships() {
final Processor processor = processorRef.get().getProcessor();
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
return getProcessor().getRelationships();
}
}
@@ -1139,7 +1142,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public String toString() {
final Processor processor = processorRef.get().getProcessor();
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
return getProcessor().toString();
}
}
@@ -1161,7 +1164,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final Processor processor = processorRef.get().getProcessor();
activateThread();
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
processor.onTrigger(context, sessionFactory);
} finally {
deactivateThread();
@@ -1497,7 +1500,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// Now that the task has been scheduled, set the timeout
completionTimestampRef.set(System.currentTimeMillis() + onScheduleTimeoutMillis);
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
try {
activateThread();
try {
@@ -1540,7 +1543,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
+ "initialize and run the Processor again after the 'Administrative Yield Duration' has elapsed. Failure is due to " + e, e);
// If processor's task completed Exceptionally, then we want to retry initiating the start (if Processor is still scheduled to run).
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
activateThread();
try {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
@@ -1637,7 +1640,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
schedulingAgent.unschedule(StandardProcessorNode.this, scheduleState);
activateThread();
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
} finally {
deactivateThread();
@@ -1649,7 +1652,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final boolean allThreadsComplete = scheduleState.getActiveThreadCount() == 1;
if (allThreadsComplete) {
activateThread();
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
} finally {
deactivateThread();
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 9651add..bce85f8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -40,6 +40,7 @@ import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -67,20 +68,22 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory, final ComponentVariableRegistry variableRegistry,
- final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger) {
+ final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger) {
this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory,
reportingTask.getComponent().getClass().getSimpleName(), reportingTask.getComponent().getClass().getCanonicalName(),
- variableRegistry, reloadComponent, validationTrigger, false);
+ variableRegistry, reloadComponent, extensionManager, validationTrigger, false);
}
public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry,
- final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger, final boolean isExtensionMissing) {
+ final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger,
+ final boolean isExtensionMissing) {
- super(id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, validationTrigger, isExtensionMissing);
+ super(id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent,
+ extensionManager, validationTrigger, isExtensionMissing);
this.reportingTaskRef = new AtomicReference<>(new ReportingTaskDetails(reportingTask));
this.processScheduler = processScheduler;
this.serviceLookup = controllerServiceProvider;
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index 124f142..1cc5325 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -29,6 +29,7 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingTask;
@@ -39,17 +40,19 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
- final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger) {
- super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, reloadComponent, validationTrigger);
+ final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent,
+ final ExtensionManager extensionManager, final ValidationTrigger validationTrigger) {
+ super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, reloadComponent, extensionManager, validationTrigger);
this.flowController = controller;
}
public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
final String componentType, final String canonicalClassName, final ComponentVariableRegistry variableRegistry,
- final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger, final boolean isExtensionMissing) {
+ final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger,
+ final boolean isExtensionMissing) {
super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,
- variableRegistry, reloadComponent, validationTrigger, isExtensionMissing);
+ variableRegistry, reloadComponent, extensionManager, validationTrigger, isExtensionMissing);
this.flowController = controller;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index de97225..52dc89c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -36,6 +36,7 @@ import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.SimpleProcessLogger;
@@ -65,6 +66,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
private final AtomicInteger maxThreadCount;
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
private final StringEncryptor encryptor;
+ private final ExtensionManager extensionManager;
private volatile String adminYieldDuration = "1 sec";
@@ -72,7 +74,8 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
private final ConcurrentMap<Connectable, LifecycleState> scheduleStates = new ConcurrentHashMap<>();
public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider,
- final EventDrivenWorkerQueue workerQueue, final RepositoryContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) {
+ final EventDrivenWorkerQueue workerQueue, final RepositoryContextFactory contextFactory, final int maxThreadCount,
+ final StringEncryptor encryptor, final ExtensionManager extensionManager) {
super(flowEngine);
this.serviceProvider = serviceProvider;
this.stateManagerProvider = stateManagerProvider;
@@ -80,6 +83,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
this.contextFactory = contextFactory;
this.maxThreadCount = new AtomicInteger(maxThreadCount);
this.encryptor = encryptor;
+ this.extensionManager = extensionManager;
for (int i = 0; i < maxThreadCount; i++) {
final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount);
@@ -305,7 +309,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
}
try {
- try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(worker.getClass(), worker.getIdentifier())) {
+ try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(extensionManager, worker.getClass(), worker.getIdentifier())) {
worker.onTrigger(processContext, sessionFactory);
} catch (final ProcessException pe) {
logger.error("{} failed to process session due to {}", worker, pe.toString());
@@ -323,7 +327,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
}
} finally {
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(worker.getClass(), worker.getIdentifier())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, worker.getClass(), worker.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker, processContext);
}
}
@@ -346,7 +350,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
}
try {
- try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(worker.getProcessor().getClass(), worker.getIdentifier())) {
+ try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(extensionManager, worker.getProcessor().getClass(), worker.getIdentifier())) {
worker.onTrigger(processContext, sessionFactory);
} catch (final ProcessException pe) {
final ComponentLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor());
@@ -365,7 +369,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
// if the processor is no longer scheduled to run and this is the last thread,
// invoke the OnStopped methods
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(worker.getProcessor().getClass(), worker.getIdentifier())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, worker.getProcessor().getClass(), worker.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker.getProcessor(), processContext);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index 43c5e56..0f73c0e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -74,7 +74,7 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
throw new IllegalStateException("Cannot schedule Reporting Task " + taskNode.getReportingTask().getIdentifier() + " to run because its scheduling period is not valid");
}
- final ReportingTaskWrapper taskWrapper = new ReportingTaskWrapper(taskNode, scheduleState);
+ final ReportingTaskWrapper taskWrapper = new ReportingTaskWrapper(taskNode, scheduleState, flowController.getExtensionManager());
final AtomicBoolean canceled = new AtomicBoolean(false);
final Date initialDate = cronExpression.getTimeAfter(new Date());
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 0459372..4e396fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -210,7 +210,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
return;
}
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), reportingTask.getClass(), reportingTask.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
}
@@ -257,7 +257,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
synchronized (lifecycleState) {
lifecycleState.setScheduled(false);
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), reportingTask.getClass(), reportingTask.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext);
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
@@ -483,7 +483,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) {
final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier()));
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(connectable.getClass(), connectable.getIdentifier())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), connectable.getClass(), connectable.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index 806fc67..db937e9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -68,7 +68,7 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
@Override
public void doSchedule(final ReportingTaskNode taskNode, final LifecycleState scheduleState) {
- final Runnable reportingTaskWrapper = new ReportingTaskWrapper(taskNode, scheduleState);
+ final Runnable reportingTaskWrapper = new ReportingTaskWrapper(taskNode, scheduleState, flowController.getExtensionManager());
final long schedulingNanos = taskNode.getSchedulingPeriod(TimeUnit.NANOSECONDS);
final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(reportingTaskWrapper, 0L, schedulingNanos, TimeUnit.NANOSECONDS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index e5192b3..e82c436 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -194,7 +194,7 @@ public class ControllerServiceLoader {
BundleCoordinate coordinate;
try {
- coordinate = BundleUtils.getCompatibleBundle(dto.getType(), dto.getBundle());
+ coordinate = BundleUtils.getCompatibleBundle(provider.getExtensionManager(), dto.getType(), dto.getBundle());
} catch (final IllegalStateException e) {
final BundleDTO bundleDTO = dto.getBundle();
if (bundleDTO == null) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java
index ea83edc..1347e78 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.service;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import java.lang.reflect.InvocationTargetException;
@@ -43,19 +44,21 @@ public class StandardControllerServiceInvocationHandler implements ControllerSer
private final ControllerService originalService;
private final AtomicReference<ControllerServiceNode> serviceNodeHolder = new AtomicReference<>(null);
+ private final ExtensionManager extensionManager;
/**
* @param originalService the original service being proxied
*/
- public StandardControllerServiceInvocationHandler(final ControllerService originalService) {
- this(originalService, null);
+ public StandardControllerServiceInvocationHandler(final ExtensionManager extensionManager, final ControllerService originalService) {
+ this(extensionManager, originalService, null);
}
/**
* @param originalService the original service being proxied
* @param serviceNode the node holding the original service which will be used for checking the state (disabled vs running)
*/
- public StandardControllerServiceInvocationHandler(final ControllerService originalService, final ControllerServiceNode serviceNode) {
+ public StandardControllerServiceInvocationHandler(final ExtensionManager extensionManager, final ControllerService originalService, final ControllerServiceNode serviceNode) {
+ this.extensionManager = extensionManager;
this.originalService = originalService;
this.serviceNodeHolder.set(serviceNode);
}
@@ -80,7 +83,7 @@ public class StandardControllerServiceInvocationHandler implements ControllerSer
+ serviceNodeHolder.get().getIdentifier() + " because the Controller Service's State is currently " + state);
}
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, originalService.getClass(), originalService.getIdentifier())) {
return method.invoke(originalService, args);
} catch (final InvocationTargetException e) {
// If the ControllerService throws an Exception, it'll be wrapped in an InvocationTargetException. We want
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 2323f02..795fc8c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -61,6 +61,7 @@ import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.ComponentVariableRegistry;
@@ -92,19 +93,19 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService,
final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory,
final ControllerServiceProvider serviceProvider, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent,
- final ValidationTrigger validationTrigger) {
+ final ExtensionManager extensionManager, final ValidationTrigger validationTrigger) {
this(implementation, proxiedControllerService, invocationHandler, id, validationContextFactory, serviceProvider, implementation.getComponent().getClass().getSimpleName(),
- implementation.getComponent().getClass().getCanonicalName(), variableRegistry, reloadComponent, validationTrigger, false);
+ implementation.getComponent().getClass().getCanonicalName(), variableRegistry, reloadComponent, extensionManager, validationTrigger, false);
}
public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService,
final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory,
final ControllerServiceProvider serviceProvider, final String componentType, final String componentCanonicalClass,
- final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger,
- final boolean isExtensionMissing) {
+ final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ExtensionManager extensionManager,
+ final ValidationTrigger validationTrigger, final boolean isExtensionMissing) {
- super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, validationTrigger, isExtensionMissing);
+ super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, extensionManager, validationTrigger, isExtensionMissing);
this.serviceProvider = serviceProvider;
this.active = new AtomicBoolean();
setControllerServiceAndProxy(implementation, proxiedControllerService, invocationHandler);
@@ -429,7 +430,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
@Override
public void run() {
try {
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getControllerServiceImplementation().getClass(), getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
}
@@ -459,7 +460,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
if (isActive()) {
scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
} else {
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getControllerServiceImplementation().getClass(), getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
}
stateTransition.disable();
@@ -529,7 +530,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
private void invokeDisable(ConfigurationContext configContext) {
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getControllerServiceImplementation().getClass(), getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
LOG.debug("Successfully disabled {}", this);
} catch (Exception e) {