You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/08 18:22:57 UTC
[3/3] nifi git commit: NIFI-2909 Adding per-instance class loading
capability through @RequiresInstanceClassLoading annotation NIFI-1712
Applying per-instance class loading to HBaseClientService to allow specifying
Phoenix Client JAR
NIFI-2909 Adding per-instance class loading capability through @RequiresInstanceClassLoading annotation
NIFI-1712 Applying per-instance class loading to HBaseClientService to allow specifying Phoenix Client JAR
-Refactoring the ClassLoading so that every processor, controller service, and reporting task gets an InstanceClassLoader with a parent of the NAR ClassLoader, and only components with @RequiresInstanceClassLoading will make a copy of the NAR ClassLoader resources, and addressing some review feedback
This closes #1156
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d1d05372
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d1d05372
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d1d05372
Branch: refs/heads/master
Commit: d1d053725b72d91fbfca1a2e86691e9c9a1a3f2f
Parents: 2f0d9a3
Author: Bryan Bende <bb...@apache.org>
Authored: Mon Oct 10 09:27:57 2016 -0400
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Tue Nov 8 13:21:27 2016 -0500
----------------------------------------------------------------------
.../behavior/RequiresInstanceClassLoading.java | 42 +++
.../nifi/components/PropertyDescriptor.java | 28 ++
.../org/apache/nifi/util/file/FileUtils.java | 1 +
.../util/file/classloader/ClassLoaderUtils.java | 79 ++++--
.../file/classloader/TestClassLoaderUtils.java | 41 +++
.../src/main/asciidoc/developer-guide.adoc | 52 ++++
.../init/ControllerServiceInitializer.java | 9 +-
.../init/ProcessorInitializer.java | 8 +-
.../init/ReportingTaskingInitializer.java | 8 +-
.../controller/AbstractConfiguredComponent.java | 209 +++++++++-----
.../nifi/controller/ConfiguredComponent.java | 21 +-
.../apache/nifi/controller/ProcessorNode.java | 9 +-
.../apache/nifi/controller/FlowController.java | 60 ++--
.../controller/StandardFlowSynchronizer.java | 23 +-
.../nifi/controller/StandardProcessorNode.java | 39 +--
.../reporting/AbstractReportingTaskNode.java | 33 +--
.../reporting/StandardReportingTaskNode.java | 11 +-
.../scheduling/EventDrivenSchedulingAgent.java | 8 +-
.../scheduling/StandardProcessScheduler.java | 6 +-
.../service/ControllerServiceLoader.java | 15 +-
.../service/StandardControllerServiceNode.java | 36 +--
.../StandardControllerServiceProvider.java | 16 +-
.../tasks/ContinuallyRunConnectableTask.java | 4 +-
.../tasks/ContinuallyRunProcessorTask.java | 2 +-
.../controller/tasks/ReportingTaskWrapper.java | 4 +-
.../nifi/groups/StandardProcessGroup.java | 8 +-
.../controller/TestStandardProcessorNode.java | 275 ++++++++++++++++++-
.../scheduling/TestProcessorLifecycle.java | 37 +--
.../TestStandardProcessScheduler.java | 18 +-
.../TestStandardControllerServiceProvider.java | 76 ++---
.../service/util/TestControllerService.java | 2 +-
.../ModifiesClasspathNoAnnotationProcessor.java | 50 ++++
.../processors/ModifiesClasspathProcessor.java | 50 ++++
.../org.apache.nifi.processor.Processor | 16 ++
.../TestClasspathResources/resource1.txt | 15 +
.../TestClasspathResources/resource2.txt | 15 +
.../TestClasspathResources/resource3.txt | 15 +
.../src/test/resources/logback-test.xml | 7 +-
.../org/apache/nifi/nar/ExtensionManager.java | 82 ++++++
.../apache/nifi/nar/InstanceClassLoader.java | 147 ++++++++++
.../java/org/apache/nifi/nar/NarCloseable.java | 20 +-
.../nifi/web/controller/ControllerFacade.java | 2 +-
.../dao/impl/StandardControllerServiceDAO.java | 10 +-
.../nifi/web/dao/impl/StandardProcessorDAO.java | 10 +-
.../web/dao/impl/StandardReportingTaskDAO.java | 10 +-
.../nifi/controller/MonitorMemoryTest.java | 14 +-
.../apache/nifi/hbase/HBaseClientService.java | 8 +
.../nifi/hbase/HBase_1_1_2_ClientService.java | 5 +-
48 files changed, 1280 insertions(+), 376 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/RequiresInstanceClassLoading.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/RequiresInstanceClassLoading.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/RequiresInstanceClassLoading.java
new file mode 100644
index 0000000..f7566a6
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/RequiresInstanceClassLoading.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.annotation.behavior;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marker annotation a component can use to indicate that the framework should create a new ClassLoader
+ * for each instance of the component, copying all resources from the component's NARClassLoader to a
+ * new ClassLoader which will only be used by a given instance of the component.
+ *
+ * This annotation is typically used when a component has one or more PropertyDescriptors which set
+ * dynamicallyModifiesClasspath(boolean) to true.
+ *
+ * When this annotation is used it is important to note that each added instance of the component will increase
+ * the overall memory footprint more than that of a component without this annotation.
+ */
+@Documented
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface RequiresInstanceClassLoading {
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
index 532a034..1299d3d 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
@@ -79,6 +79,11 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
* Language
*/
private final boolean expressionLanguageSupported;
+ /**
+ * indicates whether or not this property represents resources that should be added
+ * to the classpath for this instance of the component
+ */
+ private final boolean dynamicallyModifiesClasspath;
/**
* the interface of the {@link ControllerService} that this Property refers
@@ -102,6 +107,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
this.required = builder.required;
this.sensitive = builder.sensitive;
this.dynamic = builder.dynamic;
+ this.dynamicallyModifiesClasspath = builder.dynamicallyModifiesClasspath;
this.expressionLanguageSupported = builder.expressionLanguageSupported;
this.controllerServiceDefinition = builder.controllerServiceDefinition;
this.validators = new ArrayList<>(builder.validators);
@@ -232,6 +238,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
private boolean sensitive = false;
private boolean expressionLanguageSupported = false;
private boolean dynamic = false;
+ private boolean dynamicallyModifiesClasspath = false;
private Class<? extends ControllerService> controllerServiceDefinition;
private List<Validator> validators = new ArrayList<>();
@@ -244,6 +251,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
this.required = specDescriptor.required;
this.sensitive = specDescriptor.sensitive;
this.dynamic = specDescriptor.dynamic;
+ this.dynamicallyModifiesClasspath = specDescriptor.dynamicallyModifiesClasspath;
this.expressionLanguageSupported = specDescriptor.expressionLanguageSupported;
this.controllerServiceDefinition = specDescriptor.getControllerServiceDefinition();
this.validators = new ArrayList<>(specDescriptor.validators);
@@ -332,6 +340,22 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
}
/**
+ * Specifies that the value of this property represents one or more resources that the
+ * framework should add to the classpath of the given component.
+ *
+ * NOTE: If a component contains a PropertyDescriptor where dynamicallyModifiesClasspath is set to true,
+ * the component must also be annotated with @RequiresInstanceClassloading, otherwise the component will be
+ * considered invalid.
+ *
+ * @param dynamicallyModifiesClasspath whether or not this property should be used by the framework to modify the classpath
+ * @return the builder
+ */
+ public Builder dynamicallyModifiesClasspath(final boolean dynamicallyModifiesClasspath) {
+ this.dynamicallyModifiesClasspath = dynamicallyModifiesClasspath;
+ return this;
+ }
+
+ /**
* @param values contrained set of values
* @return the builder
*/
@@ -492,6 +516,10 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
return expressionLanguageSupported;
}
+ public boolean isDynamicClasspathModifier() {
+ return dynamicallyModifiesClasspath;
+ }
+
public Class<? extends ControllerService> getControllerServiceDefinition() {
return controllerServiceDefinition;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
index ff4da8e..960bc40 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
@@ -591,4 +591,5 @@ public class FileUtils {
return digest.digest();
}
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java
index bc6728c..318d0a7 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/classloader/ClassLoaderUtils.java
@@ -17,6 +17,8 @@
package org.apache.nifi.util.file.classloader;
import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FilenameFilter;
@@ -24,23 +26,54 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Set;
public class ClassLoaderUtils {
- public static ClassLoader getCustomClassLoader(String modulePath, ClassLoader parentClassLoader, FilenameFilter filenameFilter) throws MalformedURLException {
- // Split and trim the module path(s)
- List<String> modules = (modulePath == null)
- ? null
- : Arrays.stream(modulePath.split(",")).filter(StringUtils::isNotBlank).map(String::trim).collect(Collectors.toList());
+ static final Logger logger = LoggerFactory.getLogger(ClassLoaderUtils.class);
- URL[] classpaths = getURLsForClasspath(modules, filenameFilter);
+ public static ClassLoader getCustomClassLoader(String modulePath, ClassLoader parentClassLoader, FilenameFilter filenameFilter) throws MalformedURLException {
+ URL[] classpaths = getURLsForClasspath(modulePath, filenameFilter, false);
return createModuleClassLoader(classpaths, parentClassLoader);
}
- protected static URL[] getURLsForClasspath(List<String> modulePaths, FilenameFilter filenameFilter) throws MalformedURLException {
+ /**
+ *
+ * @param modulePath a module path to get URLs from, the module path may be a comma-separated list of paths
+ * @param filenameFilter a filter to apply when a module path is a directory and performs a listing, a null filter will return all matches
+ * @return an array of URL instances representing all of the modules resolved from processing modulePath
+ * @throws MalformedURLException if a module path does not exist
+ */
+ public static URL[] getURLsForClasspath(String modulePath, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException {
+ return getURLsForClasspath(modulePath == null ? Collections.emptySet() : Collections.singleton(modulePath), filenameFilter, suppressExceptions);
+ }
+
+ /**
+ *
+ * @param modulePaths one or modules paths to get URLs from, each module path may be a comma-separated list of paths
+ * @param filenameFilter a filter to apply when a module path is a directory and performs a listing, a null filter will return all matches
+ * @param suppressExceptions if true then all modules will attempt to be resolved even if some throw an exception, if false the first exception will be thrown
+ * @return an array of URL instances representing all of the modules resolved from processing modulePaths
+ * @throws MalformedURLException if a module path does not exist
+ */
+ public static URL[] getURLsForClasspath(Set<String> modulePaths, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException {
+ // use LinkedHashSet to maintain the ordering that the incoming paths are processed
+ Set<String> modules = new LinkedHashSet<>();
+ if (modulePaths != null) {
+ modulePaths.stream()
+ .flatMap(path -> Arrays.stream(path.split(",")))
+ .filter(StringUtils::isNotBlank)
+ .map(String::trim)
+ .forEach(m -> modules.add(m));
+ }
+ return toURLs(modules, filenameFilter, suppressExceptions);
+ }
+
+ protected static URL[] toURLs(Set<String> modulePaths, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException {
List<URL> additionalClasspath = new LinkedList<>();
if (modulePaths != null) {
for (String modulePathString : modulePaths) {
@@ -52,23 +85,33 @@ public class ClassLoaderUtils {
isUrl = false;
}
if (!isUrl) {
- File modulePath = new File(modulePathString);
+ try {
+ File modulePath = new File(modulePathString);
- if (modulePath.exists()) {
+ if (modulePath.exists()) {
- additionalClasspath.add(modulePath.toURI().toURL());
+ additionalClasspath.add(modulePath.toURI().toURL());
- if (modulePath.isDirectory()) {
- File[] files = modulePath.listFiles(filenameFilter);
+ if (modulePath.isDirectory()) {
+ File[] files = modulePath.listFiles(filenameFilter);
- if (files != null) {
- for (File jarFile : files) {
- additionalClasspath.add(jarFile.toURI().toURL());
+ if (files != null) {
+ for (File classpathResource : files) {
+ if (classpathResource.isDirectory()) {
+ logger.warn("Recursive directories are not supported, skipping " + classpathResource.getAbsolutePath());
+ } else {
+ additionalClasspath.add(classpathResource.toURI().toURL());
+ }
+ }
}
}
+ } else {
+ throw new MalformedURLException("Path specified does not exist");
+ }
+ } catch (MalformedURLException e) {
+ if (!suppressExceptions) {
+ throw e;
}
- } else {
- throw new MalformedURLException("Path specified does not exist");
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java
index d2826e3..ba85e07 100644
--- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/classloader/TestClassLoaderUtils.java
@@ -18,9 +18,13 @@ package org.apache.nifi.util.file.classloader;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Set;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -79,6 +83,43 @@ public class TestClassLoaderUtils {
assertNotNull(ClassLoaderUtils.getCustomClassLoader(jarFilePath, this.getClass().getClassLoader(), getJarFilenameFilter()));
}
+ @Test
+ public void testGetURLsForClasspathWithDirectory() throws MalformedURLException {
+ final String jarFilePath = "src/test/resources/TestClassLoaderUtils";
+ URL[] urls = ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, false);
+ assertEquals(2, urls.length);
+ }
+
+ @Test
+ public void testGetURLsForClasspathWithSingleJAR() throws MalformedURLException {
+ final String jarFilePath = "src/test/resources/TestClassLoaderUtils/TestSuccess.jar";
+ URL[] urls = ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, false);
+ assertEquals(1, urls.length);
+ }
+
+ @Test(expected = MalformedURLException.class)
+ public void testGetURLsForClasspathWithSomeNonExistentAndNoSuppression() throws MalformedURLException {
+ final String jarFilePath = "src/test/resources/TestClassLoaderUtils/TestSuccess.jar,src/test/resources/TestClassLoaderUtils/FakeTest.jar";
+ ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, false);
+ }
+
+ @Test
+ public void testGetURLsForClasspathWithSomeNonExistentAndSuppression() throws MalformedURLException {
+ final String jarFilePath = "src/test/resources/TestClassLoaderUtils/TestSuccess.jar,src/test/resources/TestClassLoaderUtils/FakeTest.jar";
+ URL[] urls = ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, true);
+ assertEquals(1, urls.length);
+ }
+
+ @Test
+ public void testGetURLsForClasspathWithSetAndSomeNonExistentAndSuppression() throws MalformedURLException {
+ final Set<String> modules = new HashSet<>();
+ modules.add("src/test/resources/TestClassLoaderUtils/TestSuccess.jar,src/test/resources/TestClassLoaderUtils/FakeTest1.jar");
+ modules.add("src/test/resources/TestClassLoaderUtils/FakeTest2.jar,src/test/resources/TestClassLoaderUtils/FakeTest3.jar");
+
+ URL[] urls = ClassLoaderUtils.getURLsForClasspath(modules, null, true);
+ assertEquals(1, urls.length);
+ }
+
protected FilenameFilter getJarFilenameFilter(){
return (dir, name) -> name != null && name.endsWith(".jar");
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-docs/src/main/asciidoc/developer-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc
index 299f510..195b4f1 100644
--- a/nifi-docs/src/main/asciidoc/developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc
@@ -2269,6 +2269,58 @@ API artifacts into the same NAR is often acceptable.
+[[per-instance-classloading]]
+== Per-Instance ClassLoading
+
+A component developer may wish to add additional resources to the component\u2019s classpath at runtime.
+For example, you may want to provide the location of a JDBC driver to a processor that interacts with a
+relational database, thus allowing the processor to work with any driver rather than trying to bundle a
+driver into the NAR.
+
+This may be accomplished by declaring one or more PropertyDescriptor instances with
+`dynamicallyModifiesClasspath` set to true. For example:
+
+
+[source,java]
+----
+PropertyDescriptor EXTRA_RESOURCE = new PropertyDescriptor.Builder()
+ .name("Extra Resources")
+ .description("The path to one or more resources to add to the classpath.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .dynamicallyModifiesClasspath(true)
+ .build();
+----
+
+When these properties are set on a component, the framework identifies all properties where
+`dynamicallyModifiesClasspath` is set to true. For each of these properties, the framework
+attempts to resolve filesystem resources from the value of the property. The value may be a
+comma-separated list of one or more directories or files, where any paths that do not exist are
+skipped. If the resource represents a directory, the directory is listed, and all of the files
+in that directory are added to the classpath individually.
+
+Each property may impose further restrictions on the format of the value through the validators.
+For example, using StandardValidators.FILE_EXISTS_VALIDATOR restricts the property to accepting a
+single file. Using StandardValidators.NON_EMPTY_VALIDATOR allows any combination of comma-separated
+files or directories.
+
+Resources are added to the instance ClassLoader by adding them to an inner ClassLoader that is always
+checked first. Anytime the value of these properties change, the inner ClassLoader is closed and
+re-created with the new resources.
+
+NiFi provides the `@RequiresInstanceClassLoading` annotation to further expand and isolate the libraries
+available on a component\u2019s classpath. You can annotate a component with `@RequiresInstanceClassLoading`
+to indicate that the instance ClassLoader for the component requires a copy of all the resources in the
+component's NAR ClassLoader. When `@RequiresInstanceClassLoading` is not present, the
+instance ClassLoader simply has it's parent ClassLoader set to the NAR ClassLoader, rather than
+copying resources.
+
+Because @RequiresInstanceClassLoading copies resources from the NAR ClassLoader for each instance of the
+component, use this capability judiciously. If ten instances of one component are created, all classes
+from the component's NAR ClassLoader are loaded into memory ten times. This could eventually increase the
+memory footprint significantly when enough instances of the component are created.
+
+
== How to contribute to Apache NiFi
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java
index c641afe..90c1e24 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java
@@ -19,6 +19,7 @@ package org.apache.nifi.documentation.init;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.documentation.ConfigurableComponentInitializer;
import org.apache.nifi.documentation.mock.MockConfigurationContext;
import org.apache.nifi.documentation.mock.MockControllerServiceInitializationContext;
@@ -38,15 +39,15 @@ public class ControllerServiceInitializer implements ConfigurableComponentInitia
@Override
public void initialize(ConfigurableComponent component) throws InitializationException {
ControllerService controllerService = (ControllerService) component;
-
- try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
- controllerService.initialize(new MockControllerServiceInitializationContext());
+ ControllerServiceInitializationContext context = new MockControllerServiceInitializationContext();
+ try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), context.getIdentifier())) {
+ controllerService.initialize(context);
}
}
@Override
public void teardown(ConfigurableComponent component) {
- try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
+ try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
ControllerService controllerService = (ControllerService) component;
final ComponentLog logger = new MockComponentLogger();
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java
index 221f9e5..ae28299 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java
@@ -26,6 +26,7 @@ import org.apache.nifi.documentation.util.ReflectionUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.ProcessorInitializationContext;
/**
* Initializes a Procesor using a MockProcessorInitializationContext
@@ -37,15 +38,16 @@ public class ProcessorInitializer implements ConfigurableComponentInitializer {
@Override
public void initialize(ConfigurableComponent component) {
Processor processor = (Processor) component;
- try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
- processor.initialize(new MockProcessorInitializationContext());
+ ProcessorInitializationContext initializationContext = new MockProcessorInitializationContext();
+ try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), initializationContext.getIdentifier())) {
+ processor.initialize(initializationContext);
}
}
@Override
public void teardown(ConfigurableComponent component) {
Processor processor = (Processor) component;
- try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
+ try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
final ComponentLog logger = new MockComponentLogger();
final MockProcessContext context = new MockProcessContext();
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java
index 8233e2e..041ff3e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java
@@ -25,6 +25,7 @@ import org.apache.nifi.documentation.mock.MockReportingInitializationContext;
import org.apache.nifi.documentation.util.ReflectionUtils;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
/**
@@ -37,15 +38,16 @@ public class ReportingTaskingInitializer implements ConfigurableComponentInitial
@Override
public void initialize(ConfigurableComponent component) throws InitializationException {
ReportingTask reportingTask = (ReportingTask) component;
- try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
- reportingTask.initialize(new MockReportingInitializationContext());
+ ReportingInitializationContext context = new MockReportingInitializationContext();
+ try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), context.getIdentifier())) {
+ reportingTask.initialize(context);
}
}
@Override
public void teardown(ConfigurableComponent component) {
ReportingTask reportingTask = (ReportingTask) component;
- try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
+ try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
final MockConfigurationContext context = new MockConfigurationContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, reportingTask, new MockComponentLogger(), context);
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
index 6460050..cc9404e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -16,10 +16,27 @@
*/
package org.apache.nifi.controller;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.nar.InstanceClassLoader;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -30,14 +47,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.nifi.components.ConfigurableComponent;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.nar.NarCloseable;
-
public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent {
private final String id;
@@ -48,13 +57,17 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
private final AtomicReference<String> annotationData = new AtomicReference<>();
private final String componentType;
private final String componentCanonicalClass;
+ private final VariableRegistry variableRegistry;
+ private final ComponentLog logger;
+
private final Lock lock = new ReentrantLock();
private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>();
public AbstractConfiguredComponent(final ConfigurableComponent component, final String id,
- final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
- final String componentType, final String componentCanonicalClass) {
+ final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
+ final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry,
+ final ComponentLog logger) {
this.id = id;
this.component = component;
this.validationContextFactory = validationContextFactory;
@@ -62,6 +75,8 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
this.name = new AtomicReference<>(component.getClass().getSimpleName());
this.componentType = componentType;
this.componentCanonicalClass = componentCanonicalClass;
+ this.variableRegistry = variableRegistry;
+ this.logger = logger;
}
@Override
@@ -90,44 +105,69 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
}
@Override
- public void setProperty(final String name, final String value) {
- if (null == name || null == value) {
- throw new IllegalArgumentException();
+ public void setProperties(Map<String, String> properties) {
+ if (properties == null) {
+ return;
}
lock.lock();
try {
verifyModifiable();
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
- final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), id)) {
+ final Set<String> modulePaths = new LinkedHashSet<>();
+ for (final Map.Entry<String, String> entry : properties.entrySet()) {
+ if (entry.getKey() != null && entry.getValue() == null) {
+ removeProperty(entry.getKey());
+ } else if (entry.getKey() != null) {
+ setProperty(entry.getKey(), entry.getValue());
+
+ // for any properties that dynamically modify the classpath, attempt to evaluate them for expression language
+ final PropertyDescriptor descriptor = component.getPropertyDescriptor(entry.getKey());
+ if (descriptor.isDynamicClasspathModifier() && !StringUtils.isEmpty(entry.getValue())) {
+ final StandardPropertyValue propertyValue = new StandardPropertyValue(entry.getValue(), null, variableRegistry);
+ modulePaths.add(propertyValue.evaluateAttributeExpressions().getValue());
+ }
+ }
+ }
- final String oldValue = properties.put(descriptor, value);
- if (!value.equals(oldValue)) {
+ processClasspathModifiers(modulePaths);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
- if (descriptor.getControllerServiceDefinition() != null) {
- if (oldValue != null) {
- final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(oldValue);
- if (oldNode != null) {
- oldNode.removeReference(this);
- }
- }
+ // Keep setProperty/removeProperty private so that all calls go through setProperties
+ private void setProperty(final String name, final String value) {
+ if (null == name || null == value) {
+ throw new IllegalArgumentException("Name or Value can not be null");
+ }
- final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(value);
- if (newNode != null) {
- newNode.addReference(this);
- }
- }
+ final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
- try {
- component.onPropertyModified(descriptor, oldValue, value);
- } catch (final Exception e) {
- // nothing really to do here...
+ final String oldValue = properties.put(descriptor, value);
+ if (!value.equals(oldValue)) {
+
+ if (descriptor.getControllerServiceDefinition() != null) {
+ if (oldValue != null) {
+ final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(oldValue);
+ if (oldNode != null) {
+ oldNode.removeReference(this);
}
}
+
+ final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(value);
+ if (newNode != null) {
+ newNode.addReference(this);
+ }
+ }
+
+ try {
+ component.onPropertyModified(descriptor, oldValue, value);
+ } catch (final Exception e) {
+ // nothing really to do here...
}
- } finally {
- lock.unlock();
}
}
@@ -141,48 +181,74 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
* @return true if removed; false otherwise
* @throws java.lang.IllegalArgumentException if the name is null
*/
- @Override
- public boolean removeProperty(final String name) {
+ private boolean removeProperty(final String name) {
if (null == name) {
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("Name can not be null");
}
- lock.lock();
- try {
- verifyModifiable();
+ final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
+ String value = null;
+ if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
- final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
- String value = null;
- if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) {
-
- if (descriptor.getControllerServiceDefinition() != null) {
- if (value != null) {
- final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value);
- if (oldNode != null) {
- oldNode.removeReference(this);
- }
- }
+ if (descriptor.getControllerServiceDefinition() != null) {
+ if (value != null) {
+ final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value);
+ if (oldNode != null) {
+ oldNode.removeReference(this);
}
+ }
+ }
- try {
- component.onPropertyModified(descriptor, value, null);
- } catch (final Exception e) {
- // nothing really to do here...
- }
+ try {
+ component.onPropertyModified(descriptor, value, null);
+ } catch (final Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ return true;
+ }
+
+ return false;
+ }
- return true;
+ /**
+ * Adds all of the modules identified by the given module paths to the InstanceClassLoader for this component.
+ *
+ * @param modulePaths a list of module paths where each entry can be a comma-separated list of multiple module paths
+ */
+ private void processClasspathModifiers(final Set<String> modulePaths) {
+ try {
+ final URL[] urls = ClassLoaderUtils.getURLsForClasspath(modulePaths, null, true);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding {} resources to the classpath for {}", new Object[] {urls.length, name});
+ for (URL url : urls) {
+ logger.debug(url.getFile());
}
}
- } finally {
- lock.unlock();
+
+ final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+
+ if (!(classLoader instanceof InstanceClassLoader)) {
+ // Really shouldn't happen, but if we somehow got here and don't have an InstanceClassLoader then log a warning and move on
+ final String classLoaderName = classLoader == null ? "null" : classLoader.getClass().getName();
+ if (logger.isWarnEnabled()) {
+ logger.warn(String.format("Unable to modify the classpath for %s, expected InstanceClassLoader, but found %s", name, classLoaderName));
+ }
+ return;
+ }
+
+ final InstanceClassLoader instanceClassLoader = (InstanceClassLoader) classLoader;
+ instanceClassLoader.setInstanceResources(urls);
+ } catch (MalformedURLException e) {
+ // Shouldn't get here since we are suppressing errors
+ logger.warn("Error processing classpath resources", e);
}
- return false;
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
final List<PropertyDescriptor> supported = component.getPropertyDescriptors();
if (supported == null || supported.isEmpty()) {
return Collections.unmodifiableMap(properties);
@@ -226,35 +292,35 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
@Override
public String toString() {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
return component.toString();
}
}
@Override
public Collection<ValidationResult> validate(final ValidationContext context) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
return component.validate(context);
}
}
@Override
public PropertyDescriptor getPropertyDescriptor(final String name) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
return component.getPropertyDescriptor(name);
}
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
component.onPropertyModified(descriptor, oldValue, newValue);
}
}
@Override
public List<PropertyDescriptor> getPropertyDescriptors() {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
return component.getPropertyDescriptors();
}
}
@@ -286,7 +352,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults;
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
validationResults = component.validate(validationContext);
}
@@ -327,4 +393,9 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
protected ValidationContextFactory getValidationContextFactory() {
return this.validationContextFactory;
}
+
+ protected VariableRegistry getVariableRegistry() {
+ return this.variableRegistry;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
index f1ee11e..91119ec 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
@@ -35,25 +35,7 @@ public interface ConfiguredComponent extends Authorizable {
public void setAnnotationData(String data);
- /**
- * Sets the property with the given name to the given value
- *
- * @param name the name of the property to update
- * @param value the value to update the property to
- */
- public void setProperty(String name, String value);
-
- /**
- * Removes the property and value for the given property name if a
- * descriptor and value exists for the given name. If the property is
- * optional its value might be reset to default or will be removed entirely
- * if was a dynamic property.
- *
- * @param name the property to remove
- * @return true if removed; false otherwise
- * @throws java.lang.IllegalArgumentException if the name is null
- */
- public boolean removeProperty(String name);
+ public void setProperties(Map<String, String> properties);
public Map<PropertyDescriptor, String> getProperties();
@@ -75,4 +57,5 @@ public interface ConfiguredComponent extends Authorizable {
* @return the Canonical Class Name of the component
*/
String getCanonicalClassName();
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 0fe306c..08b4abe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -28,10 +28,12 @@ import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,9 +45,10 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
protected final AtomicReference<ScheduledState> scheduledState;
public ProcessorNode(final Processor processor, final String id,
- final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
- final String componentType, final String componentCanonicalClass) {
- super(processor, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass);
+ final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
+ final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry,
+ final ComponentLog logger) {
+ super(processor, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, logger);
this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 89a4379..ba5ed36 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -731,7 +731,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private void notifyComponentsConfigurationRestored() {
for (final ProcessorNode procNode : getGroup(getRootGroupId()).findAllProcessors()) {
final Processor processor = procNode.getProcessor();
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
}
}
@@ -739,7 +739,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
final ControllerService service = serviceNode.getControllerServiceImplementation();
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass(), service.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
}
}
@@ -747,7 +747,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
final ReportingTask task = taskNode.getReportingTask();
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(task.getClass())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(task.getClass(), task.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, task);
}
}
@@ -1046,21 +1046,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
creationSuccessful = false;
}
+ final ComponentLog logger = new SimpleProcessLogger(id, processor);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry);
final ProcessorNode procNode;
if (creationSuccessful) {
- procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties);
+ procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties, variableRegistry, logger);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
- procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties);
+ procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties, variableRegistry, logger);
}
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
if (firstTimeAdded) {
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
} catch (final Exception e) {
logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
@@ -1068,7 +1069,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
if (firstTimeAdded) {
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
}
}
@@ -1082,14 +1083,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
- final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type);
+ final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type, identifier);
final Class<?> rawClass;
if (detectedClassLoaderForType == null) {
// try to find from the current class loader
rawClass = Class.forName(type);
} else {
// try to find from the registered classloader for that type
- rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type));
+ rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type, identifier));
}
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
@@ -1328,7 +1329,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// invoke any methods annotated with @OnShutdown on Controller Services
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) {
final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, variableRegistry);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
}
@@ -1337,7 +1338,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// invoke any methods annotated with @OnShutdown on Reporting Tasks
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
final ConfigurationContext configContext = taskNode.getConfigurationContext();
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, taskNode.getReportingTask(), configContext);
}
}
@@ -1609,12 +1610,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
final String serviceId = controllerServiceDTO.getId();
final ControllerServiceNode serviceNode = getControllerServiceNode(serviceId);
-
- for (final Map.Entry<String, String> entry : controllerServiceDTO.getProperties().entrySet()) {
- if (entry.getValue() != null) {
- serviceNode.setProperty(entry.getKey(), entry.getValue());
- }
- }
+ serviceNode.setProperties(controllerServiceDTO.getProperties());
}
//
@@ -1728,11 +1724,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
if (config.getProperties() != null) {
- for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) {
- if (entry.getValue() != null) {
- procNode.setProperty(entry.getKey(), entry.getValue());
- }
- }
+ procNode.setProperties(config.getProperties());
}
group.addProcessor(procNode);
@@ -2826,7 +2818,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
boolean creationSuccessful = true;
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
- final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type);
+ final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type, id);
final Class<?> rawClass;
if (detectedClassLoader == null) {
rawClass = Class.forName(type);
@@ -2851,15 +2843,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ final ComponentLog logger = new SimpleProcessLogger(id, task);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry);
final ReportingTaskNode taskNode;
if (creationSuccessful) {
- taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry);
+ taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry, logger);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
- taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry);
+ taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry, logger);
}
taskNode.setName(task.getClass().getSimpleName());
@@ -2875,7 +2868,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie);
}
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getReportingTask().getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask());
} catch (final Exception e) {
@@ -2929,7 +2922,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
reportingTaskNode.verifyCanDelete();
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getReportingTask().getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
}
@@ -2947,6 +2940,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
reportingTasks.remove(reportingTaskNode.getIdentifier());
+ ExtensionManager.removeInstanceClassLoaderIfExists(reportingTaskNode.getIdentifier());
}
@Override
@@ -2966,7 +2960,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (firstTimeAdded) {
final ControllerService service = serviceNode.getControllerServiceImplementation();
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass(), service.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
}
}
@@ -3085,7 +3079,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
service.verifyCanDelete();
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass())) {
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass(), service.getIdentifier())) {
final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null, variableRegistry);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext);
}
@@ -3106,6 +3100,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
rootControllerServices.remove(service.getIdentifier());
getStateManagerProvider().onComponentRemoved(service.getIdentifier());
+ ExtensionManager.removeInstanceClassLoaderIfExists(service.getIdentifier());
+
LOG.info("{} removed from Flow Controller", service, this);
}
@@ -3451,17 +3447,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
final ProcessGroup rootGroup = getGroup(getRootGroupId());
for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
}
}
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
}
}
for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index eb9bcac..8cfb3f3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -408,11 +408,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
.filter(e -> controllerServiceMapping.containsKey(e.getValue()))
.collect(Collectors.toSet());
+ final Map<String,String> controllerServiceProps = new HashMap<>();
+
for (Map.Entry<PropertyDescriptor, String> propEntry : propertyDescriptors) {
final PropertyDescriptor propertyDescriptor = propEntry.getKey();
final ControllerServiceNode clone = controllerServiceMapping.get(propEntry.getValue());
- reportingTask.setProperty(propertyDescriptor.getName(), clone.getIdentifier());
+ controllerServiceProps.put(propertyDescriptor.getName(), clone.getIdentifier());
}
+
+ reportingTask.setProperties(controllerServiceProps);
}
}
}
@@ -514,14 +518,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
reportingTask.setAnnotationData(dto.getAnnotationData());
-
- for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
- if (entry.getValue() == null) {
- reportingTask.removeProperty(entry.getKey());
- } else {
- reportingTask.setProperty(entry.getKey(), entry.getValue());
- }
- }
+ reportingTask.setProperties(dto.getProperties());
final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(),
@@ -922,13 +919,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
procNode.setAutoTerminatedRelationships(relationships);
}
- for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) {
- if (entry.getValue() == null) {
- procNode.removeProperty(entry.getKey());
- } else {
- procNode.setProperty(entry.getKey(), entry.getValue());
- }
- }
+ procNode.setProperties(config.getProperties());
final ScheduledState scheduledState = ScheduledState.valueOf(processorDTO.getState());
if (ScheduledState.RUNNING.equals(scheduledState)) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d05372/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 42790fd..5ff9f22 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -54,6 +54,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
@@ -135,19 +136,21 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// ??????? NOT any more
public StandardProcessorNode(final Processor processor, final String uuid,
- final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
- final ControllerServiceProvider controllerServiceProvider, final NiFiProperties nifiProperties) {
+ final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
+ final ControllerServiceProvider controllerServiceProvider, final NiFiProperties nifiProperties,
+ final VariableRegistry variableRegistry, final ComponentLog logger) {
this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider,
- processor.getClass().getSimpleName(), processor.getClass().getCanonicalName(), nifiProperties);
+ processor.getClass().getSimpleName(), processor.getClass().getCanonicalName(), nifiProperties, variableRegistry, logger);
}
public StandardProcessorNode(final Processor processor, final String uuid,
- final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
- final ControllerServiceProvider controllerServiceProvider,
- final String componentType, final String componentCanonicalClass, final NiFiProperties nifiProperties) {
+ final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
+ final ControllerServiceProvider controllerServiceProvider,
+ final String componentType, final String componentCanonicalClass, final NiFiProperties nifiProperties,
+ final VariableRegistry variableRegistry, final ComponentLog logger) {
- super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass);
+ super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, logger);
this.processor = processor;
identifier = new AtomicReference<>(uuid);
@@ -811,7 +814,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
Relationship returnRel = specRel;
final Set<Relationship> relationships;
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
relationships = processor.getRelationships();
}
@@ -857,7 +860,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public Set<Relationship> getUndefinedRelationships() {
final Set<Relationship> undefined = new HashSet<>();
final Set<Relationship> relationships;
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
relationships = processor.getRelationships();
}
@@ -913,7 +916,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults;
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
validationResults = getProcessor().validate(validationContext);
}
@@ -960,7 +963,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults;
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
validationResults = getProcessor().validate(validationContext);
}
@@ -1036,14 +1039,14 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public Collection<Relationship> getRelationships() {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
return getProcessor().getRelationships();
}
}
@Override
public String toString() {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
return getProcessor().toString();
}
}
@@ -1060,7 +1063,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
- try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass())) {
+ try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
processor.onTrigger(context, sessionFactory);
}
}
@@ -1240,7 +1243,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable<Void>() {
@Override
public Void call() throws Exception {
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
return null;
}
@@ -1250,7 +1253,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
} else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
}
scheduledState.set(ScheduledState.STOPPED);
@@ -1325,7 +1328,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
try {
if (scheduleState.isScheduled()) {
schedulingAgent.unschedule(StandardProcessorNode.this, scheduleState);
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
}
}
@@ -1334,7 +1337,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// performing the lifecycle actions counts as 1 thread.
final boolean allThreadsComplete = scheduleState.getActiveThreadCount() == 1;
if (allThreadsComplete) {
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) {
+ try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
}