You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2021/06/11 18:45:08 UTC
[nifi] branch main updated: NIFI-8625: Refactor scripted components
to use ScriptRunner to fix concurrency issues (#5116)
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3e774bc NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues (#5116)
3e774bc is described below
commit 3e774bc5beadd62a646794cc659db9902c5ab1fa
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Fri Jun 11 14:44:55 2021 -0400
NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues (#5116)
NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues
---
.../lookup/script/BaseScriptedLookupService.java | 116 +++++++++----------
.../nifi/processors/script/ExecuteScript.java | 77 +++----------
.../processors/script/InvokeScriptedProcessor.java | 83 +++++++-------
...ptEngineConfigurator.java => ScriptRunner.java} | 24 ++--
.../processors/script/ScriptedTransformRecord.java | 16 +--
.../script/AbstractScriptedRecordFactory.java | 10 --
.../apache/nifi/record/script/ScriptedReader.java | 28 +++--
.../record/script/ScriptedRecordSetWriter.java | 29 +++--
.../record/sink/script/ScriptedRecordSink.java | 75 ++++++------
.../reporting/script/ScriptedReportingTask.java | 41 ++-----
.../rules/engine/script/ScriptedRulesEngine.java | 73 ++++++------
.../handlers/script/ScriptedActionHandler.java | 114 ++++++++++---------
.../script/AbstractScriptedControllerService.java | 9 +-
.../apache/nifi/script/ScriptRunnerFactory.java | 126 +++++++++++++++++++++
.../nifi/script/ScriptingComponentHelper.java | 112 +++++++-----------
.../AbstractModuleClassloaderConfigurator.java | 88 --------------
...gineConfigurator.java => BaseScriptRunner.java} | 27 ++---
...eConfigurator.java => ClojureScriptRunner.java} | 35 +++---
...eConfigurator.java => GenericScriptRunner.java} | 22 ++--
...neConfigurator.java => GroovyScriptRunner.java} | 24 ++--
...nfigurator.java => JavascriptScriptRunner.java} | 19 ++--
.../impl/JythonScriptEngineConfigurator.java | 81 -------------
.../nifi/script/impl/JythonScriptRunner.java | 62 ++++++++++
...nifi.processors.script.ScriptEngineConfigurator | 8 +-
.../script/ScriptedReportingTaskTest.groovy | 23 ++--
.../src/test/resources/groovy/test_reader.groovy | 12 +-
26 files changed, 619 insertions(+), 715 deletions(-)
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java
index ab2cc47..9c7a4b4 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/BaseScriptedLookupService.java
@@ -32,12 +32,13 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.script.AbstractScriptedControllerService;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.script.ScriptingComponentUtils;
import javax.script.Invocable;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.io.File;
import java.util.ArrayList;
@@ -155,10 +156,6 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptNeedsReload.set(true);
- // Need to reset scriptEngine if the value has changed
- if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
- scriptEngine = null;
- }
} else if (instance != null) {
// If the script provides a ConfigurableComponent, call its onPropertyModified() method
try {
@@ -181,67 +178,67 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
super.onEnabled(context);
// Call an non-interface method onEnabled(context), to allow a scripted LookupService the chance to set up as necessary
- final Invocable invocable = (Invocable) scriptEngine;
- if (configurationContext != null) {
- try {
- // Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods,
- // where lookupService is a proxied interface
- final Object obj = scriptEngine.get("lookupService");
- if (obj != null) {
- try {
- invocable.invokeMethod(obj, "onEnabled", context);
- } catch (final NoSuchMethodException nsme) {
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("Configured script LookupService does not contain an onEnabled() method.");
+ if (scriptRunner != null) {
+ final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+ final Invocable invocable = (Invocable) scriptEngine;
+ if (configurationContext != null) {
+ try {
+ // Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods,
+ // where lookupService is a proxied interface
+ final Object obj = scriptEngine.get("lookupService");
+ if (obj != null) {
+ try {
+ invocable.invokeMethod(obj, "onEnabled", context);
+ } catch (final NoSuchMethodException nsme) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Configured script LookupService does not contain an onEnabled() method.");
+ }
}
+ } else {
+ throw new ScriptException("No LookupService was defined by the script.");
}
- } else {
- throw new ScriptException("No LookupService was defined by the script.");
+ } catch (ScriptException se) {
+ throw new ProcessException("Error executing onEnabled(context) method", se);
}
- } catch (ScriptException se) {
- throw new ProcessException("Error executing onEnabled(context) method", se);
}
+ } else {
+ throw new ProcessException("Error creating ScriptRunner");
}
}
@OnDisabled
public void onDisabled(final ConfigurationContext context) {
// Call an non-interface method onDisabled(context), to allow a scripted LookupService the chance to shut down as necessary
- final Invocable invocable = (Invocable) scriptEngine;
- if (configurationContext != null) {
- try {
- // Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods,
- // where lookupService is a proxied interface
- final Object obj = scriptEngine.get("lookupService");
- if (obj != null) {
- try {
- invocable.invokeMethod(obj, "onDisabled", context);
- } catch (final NoSuchMethodException nsme) {
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("Configured script LookupService does not contain an onDisabled() method.");
+ if (scriptRunner != null) {
+ final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+ final Invocable invocable = (Invocable) scriptEngine;
+ if (configurationContext != null) {
+ try {
+ // Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods,
+ // where lookupService is a proxied interface
+ final Object obj = scriptRunner.getScriptEngine().get("lookupService");
+ if (obj != null) {
+ try {
+ invocable.invokeMethod(obj, "onDisabled", context);
+ } catch (final NoSuchMethodException nsme) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Configured script LookupService does not contain an onDisabled() method.");
+ }
}
+ } else {
+ throw new ScriptException("No LookupService was defined by the script.");
}
- } else {
- throw new ScriptException("No LookupService was defined by the script.");
+ } catch (ScriptException se) {
+ throw new ProcessException("Error executing onDisabled(context) method", se);
}
- } catch (ScriptException se) {
- throw new ProcessException("Error executing onDisabled(context) method", se);
}
+ } else {
+ throw new ProcessException("Error creating ScriptRunner");
}
}
@Override
public void setup() {
- // Create a single script engine, the Processor object is reused by each task
- if (scriptEngine == null) {
- scriptingComponentHelper.setup(1, getLogger());
- scriptEngine = scriptingComponentHelper.engineQ.poll();
- }
-
- if (scriptEngine == null) {
- throw new ProcessException("No script engine available!");
- }
-
if (scriptNeedsReload.get() || lookupService.get() == null) {
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
reloadScriptFile(scriptingComponentHelper.getScriptPath());
@@ -266,23 +263,26 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
final Collection<ValidationResult> results = new HashSet<>();
try {
+ // Create a single script engine, the Processor object is reused by each task
+ if (scriptRunner == null) {
+ scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+ scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+ }
+
+ if (scriptRunner == null) {
+ throw new ProcessException("No script runner available!");
+ }
+
// get the engine and ensure its invocable
+ ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
if (scriptEngine instanceof Invocable) {
final Invocable invocable = (Invocable) scriptEngine;
- // Find a custom configurator and invoke their eval() method
- ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
- if (configurator != null) {
- configurator.reset();
- configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- } else {
- // evaluate the script
- scriptEngine.eval(scriptBody);
- }
+ // evaluate the script
+ scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE));
// get configured LookupService from the script (if it exists)
- final Object obj = scriptEngine.get("lookupService");
+ final Object obj = scriptRunner.getScriptEngine().get("lookupService");
if (obj != null) {
final ComponentLog logger = getLogger();
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
index f3b5d92..8d64dcf 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
@@ -52,7 +52,6 @@ import org.apache.nifi.search.Searchable;
import javax.script.Bindings;
import javax.script.ScriptContext;
import javax.script.ScriptEngine;
-import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.FileInputStream;
import java.io.IOException;
@@ -120,7 +119,7 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
- scriptingComponentHelper.createResources();
+ scriptingComponentHelper.createResources(false);
}
}
@@ -150,35 +149,6 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
return scriptingComponentHelper.customValidate(validationContext);
}
-
- /**
- * Handles changes to this processor's properties. If changes are made to
- * script- or engine-related properties, the script will be reloaded.
- *
- * @param descriptor of the modified property
- * @param oldValue non-null property value (previous)
- * @param newValue the new property value or if null indicates the property
- */
- @Override
- public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
-
- if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)
- || ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
- || ScriptingComponentUtils.MODULES.equals(descriptor)
- || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
-
- // Reset the configurator on change, this can indicate to the configurator to recompile the script on next init()
- String scriptEngineName = scriptingComponentHelper.getScriptEngineName();
- if (scriptEngineName != null) {
- ScriptEngineConfigurator configurator =
- scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase());
- if (configurator != null) {
- configurator.reset();
- }
- }
- }
- }
-
/**
* Performs setup operations when the processor is scheduled to run. This includes evaluating the processor's
* properties, as well as reloading the script (from file or the "Script Body" property)
@@ -189,9 +159,6 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
public void setup(final ProcessContext context) {
scriptingComponentHelper.setupVariables(context);
- // Create a script engine for each possible task
- int maxTasks = context.getMaxConcurrentTasks();
- scriptingComponentHelper.setup(maxTasks, getLogger());
scriptToRun = scriptingComponentHelper.getScriptBody();
try {
@@ -203,6 +170,10 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
} catch (IOException ioe) {
throw new ProcessException(ioe);
}
+
+ // Create a script engine for each possible task
+ int maxTasks = context.getMaxConcurrentTasks();
+ scriptingComponentHelper.setupScriptRunners(maxTasks, scriptToRun, getLogger());
}
/**
@@ -220,17 +191,18 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
- scriptingComponentHelper.createResources();
+ scriptingComponentHelper.createResources(false);
}
}
- ScriptEngine scriptEngine = scriptingComponentHelper.engineQ.poll();
+ ScriptRunner scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
ComponentLog log = getLogger();
- if (scriptEngine == null) {
+ if (scriptRunner == null) {
// No engine available so nothing more to do here
return;
}
ProcessSession session = sessionFactory.createSession();
try {
+ ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
try {
Bindings bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
@@ -253,31 +225,16 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
}
}
- scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
-
- // Execute any engine-specific configuration before the script is evaluated
- ScriptEngineConfigurator configurator =
- scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-
- // Evaluate the script with the configurator (if it exists) or the engine
- if (configurator != null) {
- configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
- configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
- } else {
- scriptEngine.eval(scriptToRun);
- }
+ scriptRunner.run(bindings);
// Commit this session for the user. This plus the outermost catch statement mimics the behavior
// of AbstractProcessor. This class doesn't extend AbstractProcessor in order to share a base
// class with InvokeScriptedProcessor
session.commitAsync();
- } catch (ScriptException e) {
- // Reset the configurator on error, this can indicate to the configurator to recompile the script on next init()
- ScriptEngineConfigurator configurator =
- scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
- if (configurator != null) {
- configurator.reset();
- }
+ scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);
+ } catch (Throwable t) {
+ // Create a new ScriptRunner to replace the one that caused an exception
+ scriptingComponentHelper.setupScriptRunners(false, 1, scriptToRun, getLogger());
// The below 'session.rollback(true)' reverts any changes made during this session (all FlowFiles are
// restored back to their initial session state and back to their original queues after being penalized).
@@ -285,7 +242,7 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
// cause resource exhaustion. In case a user does not want to yield, it can be set to 0s in the processor
// configuration.
context.yield();
- throw new ProcessException(e);
+ throw new ProcessException(t);
}
} catch (final Throwable t) {
// Mimic AbstractProcessor behavior here
@@ -295,8 +252,6 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
// the flow file from the session binding (ff = session.get()).
session.rollback(true);
throw t;
- } finally {
- scriptingComponentHelper.engineQ.offer(scriptEngine);
}
}
@@ -310,7 +265,7 @@ public class ExecuteScript extends AbstractSessionFactoryProcessor implements Se
// Create the resources whether or not they have been created already, this method is guaranteed to have the instance classloader set
// as the thread context class loader. Other methods that call createResources() may be called from other threads with different
// classloaders
- scriptingComponentHelper.createResources();
+ scriptingComponentHelper.createResources(false);
}
@Override
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
index 366f8fa..681c798 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
@@ -51,6 +51,7 @@ import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.script.impl.FilteredPropertiesValidationContextAdapter;
import javax.script.Invocable;
+import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.io.File;
@@ -93,7 +94,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
private final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
- private volatile ScriptEngine scriptEngine = null;
+ private volatile ScriptRunner scriptRunner = null;
private volatile String kerberosServicePrincipal = null;
private volatile File kerberosConfigFile = null;
private volatile File kerberosServiceKeytab = null;
@@ -152,8 +153,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
scriptingComponentHelper.createResources();
}
}
- List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>();
- supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors());
+ List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>(scriptingComponentHelper.getDescriptors());
final Processor instance = processor.get();
if (instance != null) {
@@ -213,16 +213,6 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
}
public void setup() {
- // Create a single script engine, the Processor object is reused by each task
- if(scriptEngine == null) {
- scriptingComponentHelper.setup(1, getLogger());
- scriptEngine = scriptingComponentHelper.engineQ.poll();
- }
-
- if (scriptEngine == null) {
- throw new ProcessException("No script engine available!");
- }
-
if (scriptNeedsReload.get() || processor.get() == null) {
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
reloadScriptFile(scriptingComponentHelper.getScriptPath());
@@ -254,7 +244,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptNeedsReload.set(true);
- scriptEngine = null; //reset engine. This happens only when a processor is stopped, so there won't be any performance impact in run-time.
+ scriptRunner = null; //reset engine. This happens only when a processor is stopped, so there won't be any performance impact in run-time.
} else if (instance != null) {
// If the script provides a Processor, call its onPropertyModified() method
try {
@@ -342,20 +332,22 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
final Collection<ValidationResult> results = new HashSet<>();
try {
+ // Create a single script engine, the Processor object is reused by each task
+ if (scriptRunner == null) {
+ scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+ scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+ }
+
+ if (scriptRunner == null) {
+ throw new ProcessException("No script runner available!");
+ }
// get the engine and ensure its invocable
+ ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
if (scriptEngine instanceof Invocable) {
final Invocable invocable = (Invocable) scriptEngine;
- // Find a custom configurator and invoke their eval() method
- ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
- if (configurator != null) {
- configurator.reset();
- configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- } else {
- // evaluate the script
- scriptEngine.eval(scriptBody);
- }
+ // evaluate the script
+ scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE));
// get configured processor from the script (if it exists)
final Object obj = scriptEngine.get("processor");
@@ -579,32 +571,37 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
invokeScriptedProcessorMethod("onStopped", context);
scriptingComponentHelper.stop();
processor.set(null);
- scriptEngine = null;
+ scriptRunner = null;
}
private void invokeScriptedProcessorMethod(String methodName, Object... params) {
// Run the scripted processor's method here, if it exists
- if (scriptEngine instanceof Invocable) {
- final Invocable invocable = (Invocable) scriptEngine;
- final Object obj = scriptEngine.get("processor");
- if (obj != null) {
-
- ComponentLog logger = getLogger();
- try {
- invocable.invokeMethod(obj, methodName, params);
- } catch (final NoSuchMethodException nsme) {
- if (logger.isDebugEnabled()) {
- logger.debug("Configured script Processor does not contain the method " + methodName);
- }
- } catch (final Exception e) {
- // An error occurred during onScheduled, propagate it up
- logger.error("Error while executing the scripted processor's method " + methodName, e);
- if (e instanceof ProcessException) {
- throw (ProcessException) e;
+ if (scriptRunner != null) {
+ final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+ if (scriptEngine instanceof Invocable) {
+ final Invocable invocable = (Invocable) scriptEngine;
+ final Object obj = scriptEngine.get("processor");
+ if (obj != null) {
+
+ ComponentLog logger = getLogger();
+ try {
+ invocable.invokeMethod(obj, methodName, params);
+ } catch (final NoSuchMethodException nsme) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Configured script Processor does not contain the method " + methodName);
+ }
+ } catch (final Exception e) {
+ // An error occurred during onScheduled, propagate it up
+ logger.error("Error while executing the scripted processor's method " + methodName, e);
+ if (e instanceof ProcessException) {
+ throw (ProcessException) e;
+ }
+ throw new ProcessException(e);
}
- throw new ProcessException(e);
}
}
+ } else {
+ throw new ProcessException("Error creating ScriptRunner");
}
}
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptRunner.java
similarity index 61%
rename from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptEngineConfigurator.java
rename to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptRunner.java
index 995c16e..dd2278a 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptEngineConfigurator.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptRunner.java
@@ -17,26 +17,22 @@
package org.apache.nifi.processors.script;
-import org.apache.nifi.logging.ComponentLog;
-
+import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
-import java.net.URL;
-/**
- * This interface describes callback methods used by the ExecuteScript/InvokeScript processors to perform
- * engine-specific tasks at various points in the engine lifecycle.
- */
-public interface ScriptEngineConfigurator {
+public interface ScriptRunner {
String getScriptEngineName();
- URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log);
-
- Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException;
+ ScriptEngine getScriptEngine();
- Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException;
+ /**
+ * Runs the script held by this ScriptRunner using the provided Bindings map (to include the session object for example)
+ *
+ * @param bindings The Bindings the underlying engine should use when running the script
+ * @throws ScriptException if an error occurs during execution of the script
+ */
+ void run(Bindings bindings) throws ScriptException;
- default void reset() {
- }
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java
index 46b5e04..efe54de 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java
@@ -172,10 +172,6 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
}
scriptingComponentHelper.setupVariables(context);
-
- // Create a script engine for each possible task
- final int maxTasks = context.getMaxConcurrentTasks();
- scriptingComponentHelper.setup(maxTasks, getLogger());
scriptToRun = scriptingComponentHelper.getScriptBody();
if (scriptToRun == null && scriptingComponentHelper.getScriptPath() != null) {
@@ -183,6 +179,11 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
scriptToRun = IOUtils.toString(scriptStream, Charset.defaultCharset());
}
}
+
+ // Create a script runner for each possible task
+ final int maxTasks = context.getMaxConcurrentTasks();
+ scriptingComponentHelper.setupScriptRunners(maxTasks, scriptToRun, getLogger());
+
// Always compile when first run
compiledScriptRef.set(null);
}
@@ -195,8 +196,8 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
return;
}
- final ScriptEngine scriptEngine = scriptingComponentHelper.engineQ.poll();
- if (scriptEngine == null) {
+ final ScriptRunner scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+ if (scriptRunner == null) {
// This shouldn't happen. But just in case.
session.rollback();
return;
@@ -205,6 +206,7 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
try {
final ScriptEvaluator evaluator;
try {
+ ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
evaluator = createEvaluator(scriptEngine, flowFile);
} catch (final ScriptException se) {
getLogger().error("Failed to initialize script engine", se);
@@ -214,7 +216,7 @@ public class ScriptedTransformRecord extends AbstractProcessor implements Search
transform(flowFile, evaluator, context, session);
} finally {
- scriptingComponentHelper.engineQ.offer(scriptEngine);
+ scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java
index aa36ab4..0c5816d 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java
@@ -30,16 +30,6 @@ public abstract class AbstractScriptedRecordFactory<T> extends AbstractScriptedC
protected final AtomicReference<T> recordFactory = new AtomicReference<>();
public void setup() {
- // Create a single script engine, the Processor object is reused by each task
- if (scriptEngine == null) {
- scriptingComponentHelper.setup(1, getLogger());
- scriptEngine = scriptingComponentHelper.engineQ.poll();
- }
-
- if (scriptEngine == null) {
- throw new ProcessException("No script engine available!");
- }
-
if (scriptNeedsReload.get() || recordFactory.get() == null) {
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
if (!reloadScriptFile(scriptingComponentHelper.getScriptPath())) {
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java
index 009d1d2..fa4a20c 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java
@@ -25,13 +25,15 @@ import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.script.ScriptEngineConfigurator;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import javax.script.Invocable;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.InputStream;
@@ -65,6 +67,8 @@ public class ScriptedReader extends AbstractScriptedRecordFactory<RecordReaderFa
try {
return recordFactory.get().createRecordReader(variables, in, inputLength, logger);
} catch (UndeclaredThrowableException ute) {
+ scriptRunner = null;
+ scriptingComponentHelper.scriptRunnerQ.clear();
throw new IOException(ute.getCause());
}
}
@@ -84,23 +88,23 @@ public class ScriptedReader extends AbstractScriptedRecordFactory<RecordReaderFa
final Collection<ValidationResult> results = new HashSet<>();
try {
+ // Create a single script engine, the Processor object is reused by each task
+ scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+ scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+
+ if (scriptRunner == null) {
+ throw new ProcessException("No script runner available!");
+ }
// get the engine and ensure its invocable
+ ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
if (scriptEngine instanceof Invocable) {
final Invocable invocable = (Invocable) scriptEngine;
- // Find a custom configurator and invoke their eval() method
- ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
- if (configurator != null) {
- configurator.reset();
- configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- } else {
- // evaluate the script
- scriptEngine.eval(scriptBody);
- }
+ // evaluate the script
+ scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE));
// get configured processor from the script (if it exists)
- final Object obj = scriptEngine.get("reader");
+ final Object obj = scriptRunner.getScriptEngine().get("reader");
if (obj != null) {
final ComponentLog logger = getLogger();
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
index 99763b0..83f26df 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
@@ -25,13 +25,15 @@ import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.script.ScriptEngineConfigurator;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
import javax.script.Invocable;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.OutputStream;
@@ -88,23 +90,26 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
final Collection<ValidationResult> results = new HashSet<>();
try {
+ // Create a single script engine, the Processor object is reused by each task
+ if (scriptRunner == null) {
+ scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+ scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+ }
+
+ if (scriptRunner == null) {
+ throw new ProcessException("No script runner available!");
+ }
// get the engine and ensure its invocable
+ ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
if (scriptEngine instanceof Invocable) {
final Invocable invocable = (Invocable) scriptEngine;
- // Find a custom configurator and invoke their eval() method
- ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
- if (configurator != null) {
- configurator.reset();
- configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- } else {
- // evaluate the script
- scriptEngine.eval(scriptBody);
- }
+ // evaluate the script
+ scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE));
+
// get configured processor from the script (if it exists)
- final Object obj = scriptEngine.get("writer");
+ final Object obj = scriptRunner.getScriptEngine().get("writer");
if (obj != null) {
final ComponentLog logger = getLogger();
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java
index f1bbcde..652965f 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java
@@ -30,7 +30,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.script.AbstractScriptedControllerService;
import org.apache.nifi.script.ScriptingComponentHelper;
@@ -38,6 +37,8 @@ import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSet;
import javax.script.Invocable;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
@@ -104,16 +105,6 @@ public class ScriptedRecordSink extends AbstractScriptedControllerService implem
}
public void setup() {
- // Create a single script engine, the Processor object is reused by each task
- if (scriptEngine == null) {
- scriptingComponentHelper.setup(1, getLogger());
- scriptEngine = scriptingComponentHelper.engineQ.poll();
- }
-
- if (scriptEngine == null) {
- throw new ProcessException("No script engine available!");
- }
-
if (scriptNeedsReload.get() || recordSink.get() == null) {
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
reloadScriptFile(scriptingComponentHelper.getScriptPath());
@@ -137,23 +128,26 @@ public class ScriptedRecordSink extends AbstractScriptedControllerService implem
final Collection<ValidationResult> results = new HashSet<>();
try {
+ // Create a single script engine, the Processor object is reused by each task
+ if (scriptRunner == null) {
+ scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+ scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+ }
+
+ if (scriptRunner == null) {
+ throw new ProcessException("No script runner available!");
+ }
// get the engine and ensure its invocable
+ ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
if (scriptEngine instanceof Invocable) {
final Invocable invocable = (Invocable) scriptEngine;
- // Find a custom configurator and invoke their eval() method
- ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
- if (configurator != null) {
- configurator.reset();
- configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- } else {
- // evaluate the script
- scriptEngine.eval(scriptBody);
- }
+ // evaluate the script
+ scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE));
+
// get configured processor from the script (if it exists)
- final Object obj = scriptEngine.get("recordSink");
+ final Object obj = scriptRunner.getScriptEngine().get("recordSink");
if (obj != null) {
final ComponentLog logger = getLogger();
@@ -217,26 +211,31 @@ public class ScriptedRecordSink extends AbstractScriptedControllerService implem
super.onEnabled(context);
// Call an non-interface method onEnabled(context), to allow a scripted RecordSinkService the chance to set up as necessary
- final Invocable invocable = (Invocable) scriptEngine;
- if (configurationContext != null) {
- try {
- // Get the actual object from the script engine, versus the proxy stored in RecordSinkService. The object may have additional methods,
- // where RecordSinkService is a proxied interface
- final Object obj = scriptEngine.get("recordSink");
- if (obj != null) {
- try {
- invocable.invokeMethod(obj, "onEnabled", context);
- } catch (final NoSuchMethodException nsme) {
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("Configured script RecordSinkService does not contain an onEnabled() method.");
+ if (scriptRunner != null) {
+ final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+ final Invocable invocable = (Invocable) scriptEngine;
+ if (configurationContext != null) {
+ try {
+ // Get the actual object from the script engine, versus the proxy stored in RecordSinkService. The object may have additional methods,
+ // where RecordSinkService is a proxied interface
+ final Object obj = scriptRunner.getScriptEngine().get("recordSink");
+ if (obj != null) {
+ try {
+ invocable.invokeMethod(obj, "onEnabled", context);
+ } catch (final NoSuchMethodException nsme) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Configured script RecordSinkService does not contain an onEnabled() method.");
+ }
}
+ } else {
+ throw new ScriptException("No RecordSinkService was defined by the script.");
}
- } else {
- throw new ScriptException("No RecordSinkService was defined by the script.");
+ } catch (ScriptException se) {
+ throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se);
}
- } catch (ScriptException se) {
- throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se);
}
+ } else {
+ throw new ProcessException("Error creating ScriptRunner");
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java
index e19022c..f6a7c33 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/reporting/script/ScriptedReportingTask.java
@@ -33,7 +33,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.script.ScriptEngineConfigurator;
+import org.apache.nifi.processors.script.ScriptRunner;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.script.ScriptingComponentHelper;
@@ -128,8 +128,7 @@ public class ScriptedReportingTask extends AbstractReportingTask {
public void setup(final ConfigurationContext context) {
scriptingComponentHelper.setupVariables(context);
- // Create a script engine for each possible task
- scriptingComponentHelper.setup(1, getLogger());
+ // Create a script runner
scriptToRun = scriptingComponentHelper.getScriptBody();
try {
@@ -142,6 +141,7 @@ public class ScriptedReportingTask extends AbstractReportingTask {
} catch (IOException ioe) {
throw new ProcessException(ioe);
}
+ scriptingComponentHelper.setupScriptRunners(1, scriptToRun, getLogger());
vmMetrics = JmxJvmMetrics.getInstance();
}
@@ -153,15 +153,15 @@ public class ScriptedReportingTask extends AbstractReportingTask {
scriptingComponentHelper.createResources();
}
}
- ScriptEngine scriptEngine = scriptingComponentHelper.engineQ.poll();
+ ScriptRunner scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
ComponentLog log = getLogger();
- if (scriptEngine == null) {
+ if (scriptRunner == null) {
// No engine available so nothing more to do here
return;
}
try {
-
+ ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
try {
Bindings bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
if (bindings == null) {
@@ -180,38 +180,19 @@ public class ScriptedReportingTask extends AbstractReportingTask {
}
}
}
+ scriptRunner.run(bindings);
+ scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);
- scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
-
- // Execute any engine-specific configuration before the script is evaluated
- ScriptEngineConfigurator configurator =
- scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-
- // Evaluate the script with the configurator (if it exists) or the engine
- if (configurator != null) {
- configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
- configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
- } else {
- scriptEngine.eval(scriptToRun);
- }
} catch (ScriptException e) {
- // Reset the configurator on error, this can indicate to the configurator to recompile the script on next init()
- ScriptEngineConfigurator configurator =
- scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
+ // Create a new ScriptRunner to replace the one that caused an exception
+ scriptingComponentHelper.setupScriptRunners(1, scriptToRun, getLogger());
- // Evaluate the script with the configurator (if it exists) or the engine
- if (configurator != null) {
- configurator.reset();
- }
throw new ProcessException(e);
}
} catch (final Throwable t) {
// Mimic AbstractProcessor behavior here
- getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
+ getLogger().error("{} failed to process due to {}; rolling back session", this, t);
throw t;
- } finally {
- scriptingComponentHelper.engineQ.offer(scriptEngine);
}
-
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/engine/script/ScriptedRulesEngine.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/engine/script/ScriptedRulesEngine.java
index 7c46d19..7d09509 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/engine/script/ScriptedRulesEngine.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/engine/script/ScriptedRulesEngine.java
@@ -27,13 +27,14 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.rules.Action;
import org.apache.nifi.rules.engine.RulesEngineService;
import org.apache.nifi.script.AbstractScriptedControllerService;
import org.apache.nifi.script.ScriptingComponentHelper;
import javax.script.Invocable;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.Collection;
import java.util.Collections;
@@ -75,16 +76,6 @@ public class ScriptedRulesEngine extends AbstractScriptedControllerService imple
}
public void setup() {
- // Create a single script engine, the component object is reused by each task
- if (scriptEngine == null) {
- scriptingComponentHelper.setup(1, getLogger());
- scriptEngine = scriptingComponentHelper.engineQ.poll();
- }
-
- if (scriptEngine == null) {
- throw new ProcessException("No script engine available!");
- }
-
if (scriptNeedsReload.get() || rulesEngine.get() == null) {
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
reloadScriptFile(scriptingComponentHelper.getScriptPath());
@@ -108,20 +99,23 @@ public class ScriptedRulesEngine extends AbstractScriptedControllerService imple
final Collection<ValidationResult> results = new HashSet<>();
try {
+ // Create a single script engine, the Processor object is reused by each task
+ if (scriptRunner == null) {
+ scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+ scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+ }
+
+ if (scriptRunner == null) {
+ throw new ProcessException("No script runner available!");
+ }
// get the engine and ensure its invocable
+ ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
if (scriptEngine instanceof Invocable) {
final Invocable invocable = (Invocable) scriptEngine;
- // Find a custom configurator and invoke their eval() method
- ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
- if (configurator != null) {
- configurator.reset();
- configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- } else {
- // evaluate the script
- scriptEngine.eval(scriptBody);
- }
+ // evaluate the script
+ scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE));
+
// get configured processor from the script (if it exists)
final Object obj = scriptEngine.get("rulesEngine");
@@ -188,26 +182,31 @@ public class ScriptedRulesEngine extends AbstractScriptedControllerService imple
super.onEnabled(context);
// Call an non-interface method onEnabled(context), to allow a scripted RulesEngineService the chance to set up as necessary
- final Invocable invocable = (Invocable) scriptEngine;
- if (configurationContext != null) {
- try {
- // Get the actual object from the script engine, versus the proxy stored in RulesEngineService. The object may have additional methods,
- // where RulesEngineService is a proxied interface
- final Object obj = scriptEngine.get("rulesEngine");
- if (obj != null) {
- try {
- invocable.invokeMethod(obj, "onEnabled", context);
- } catch (final NoSuchMethodException nsme) {
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("Configured script RulesEngineService does not contain an onEnabled() method.");
+ if (scriptRunner != null) {
+ final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+ final Invocable invocable = (Invocable) scriptEngine;
+ if (configurationContext != null) {
+ try {
+ // Get the actual object from the script engine, versus the proxy stored in RulesEngineService. The object may have additional methods,
+ // where RulesEngineService is a proxied interface
+ final Object obj = scriptEngine.get("rulesEngine");
+ if (obj != null) {
+ try {
+ invocable.invokeMethod(obj, "onEnabled", context);
+ } catch (final NoSuchMethodException nsme) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Configured script RulesEngineService does not contain an onEnabled() method.");
+ }
}
+ } else {
+ throw new ScriptException("No RulesEngineService was defined by the script.");
}
- } else {
- throw new ScriptException("No RulesEngineService was defined by the script.");
+ } catch (ScriptException se) {
+ throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se);
}
- } catch (ScriptException se) {
- throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se);
}
+ } else {
+ throw new ProcessException("Error creating ScriptRunner");
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java
index bca51a0..2b48bf6 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java
@@ -28,7 +28,6 @@ import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.rules.Action;
import org.apache.nifi.rules.ActionHandler;
import org.apache.nifi.rules.PropertyContextActionHandler;
@@ -36,6 +35,8 @@ import org.apache.nifi.script.AbstractScriptedControllerService;
import org.apache.nifi.script.ScriptingComponentHelper;
import javax.script.Invocable;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.Collection;
import java.util.Collections;
@@ -77,16 +78,6 @@ public class ScriptedActionHandler extends AbstractScriptedControllerService imp
}
public void setup() {
- // Create a single script engine, the component object is reused by each task
- if (scriptEngine == null) {
- scriptingComponentHelper.setup(1, getLogger());
- scriptEngine = scriptingComponentHelper.engineQ.poll();
- }
-
- if (scriptEngine == null) {
- throw new ProcessException("No script engine available!");
- }
-
if (scriptNeedsReload.get() || actionHandler.get() == null) {
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
reloadScriptFile(scriptingComponentHelper.getScriptPath());
@@ -110,23 +101,26 @@ public class ScriptedActionHandler extends AbstractScriptedControllerService imp
final Collection<ValidationResult> results = new HashSet<>();
try {
+ // Create a single script engine, the Processor object is reused by each task
+ if (scriptRunner == null) {
+ scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+ scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+ }
+
+ if (scriptRunner == null) {
+ throw new ProcessException("No script runner available!");
+ }
// get the engine and ensure its invocable
+ ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
if (scriptEngine instanceof Invocable) {
final Invocable invocable = (Invocable) scriptEngine;
- // Find a custom configurator and invoke their eval() method
- ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
- if (configurator != null) {
- configurator.reset();
- configurator.init(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
- } else {
- // evaluate the script
- scriptEngine.eval(scriptBody);
- }
+ // evaluate the script
+ scriptRunner.run(scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE));
+
// get configured processor from the script (if it exists)
- final Object obj = scriptEngine.get("actionHandler");
+ final Object obj = scriptRunner.getScriptEngine().get("actionHandler");
if (obj != null) {
final ComponentLog logger = getLogger();
@@ -190,53 +184,63 @@ public class ScriptedActionHandler extends AbstractScriptedControllerService imp
super.onEnabled(context);
// Call an non-interface method onEnabled(context), to allow a scripted ActionHandler the chance to set up as necessary
- final Invocable invocable = (Invocable) scriptEngine;
- if (configurationContext != null) {
- try {
- // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
- // where ActionHandler is a proxied interface
- final Object obj = scriptEngine.get("actionHandler");
- if (obj != null) {
- try {
- invocable.invokeMethod(obj, "onEnabled", context);
- } catch (final NoSuchMethodException nsme) {
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method.");
+ if (scriptRunner != null) {
+ final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+ final Invocable invocable = (Invocable) scriptEngine;
+ if (configurationContext != null) {
+ try {
+ // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
+ // where ActionHandler is a proxied interface
+ final Object obj = scriptRunner.getScriptEngine().get("actionHandler");
+ if (obj != null) {
+ try {
+ invocable.invokeMethod(obj, "onEnabled", context);
+ } catch (final NoSuchMethodException nsme) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method.");
+ }
}
+ } else {
+ throw new ScriptException("No ActionHandler was defined by the script.");
}
- } else {
- throw new ScriptException("No ActionHandler was defined by the script.");
+ } catch (ScriptException se) {
+ throw new ProcessException("Error executing onEnabled(context) method", se);
}
- } catch (ScriptException se) {
- throw new ProcessException("Error executing onEnabled(context) method", se);
}
+ } else {
+ throw new ProcessException("Error creating ScriptRunner");
}
}
public void execute(PropertyContext context, Action action, Map<String, Object> facts) {
// Attempt to call a non-ActionHandler interface method (i.e. execute(context, action, facts) from PropertyContextActionHandler)
- final Invocable invocable = (Invocable) scriptEngine;
+ if (scriptRunner != null) {
+ final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+ final Invocable invocable = (Invocable) scriptEngine;
- try {
- // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
- // where ActionHandler is a proxied interface
- final Object obj = scriptEngine.get("actionHandler");
- if (obj != null) {
- try {
- invocable.invokeMethod(obj, "execute", context, action, facts);
- } catch (final NoSuchMethodException nsme) {
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("Configured script ActionHandler is not a PropertyContextActionHandler and has no execute(context, action, facts) method, falling back to"
- + "execute(action, facts).");
+ try {
+ // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
+ // where ActionHandler is a proxied interface
+ final Object obj = scriptRunner.getScriptEngine().get("actionHandler");
+ if (obj != null) {
+ try {
+ invocable.invokeMethod(obj, "execute", context, action, facts);
+ } catch (final NoSuchMethodException nsme) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Configured script ActionHandler is not a PropertyContextActionHandler and has no execute(context, action, facts) method, falling back to"
+ + "execute(action, facts).");
+ }
+ execute(action, facts);
}
- execute(action, facts);
+ } else {
+ throw new ScriptException("No ActionHandler was defined by the script.");
}
- } else {
- throw new ScriptException("No ActionHandler was defined by the script.");
+ } catch (ScriptException se) {
+ throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se);
}
- } catch (ScriptException se) {
- throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se);
+ } else {
+ throw new ProcessException("Error creating ScriptRunner");
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java
index 52a9e0c..35ea374 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/AbstractScriptedControllerService.java
@@ -25,8 +25,8 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.script.ScriptRunner;
-import javax.script.ScriptEngine;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -46,7 +46,7 @@ public abstract class AbstractScriptedControllerService extends AbstractControll
protected final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
- protected volatile ScriptEngine scriptEngine = null;
+ protected volatile ScriptRunner scriptRunner = null;
protected volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
protected volatile ConfigurationContext configurationContext = null;
@@ -65,8 +65,7 @@ public abstract class AbstractScriptedControllerService extends AbstractControll
scriptingComponentHelper.createResources();
}
}
- List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>();
- supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors());
+ List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>(scriptingComponentHelper.getDescriptors());
return Collections.unmodifiableList(supportedPropertyDescriptors);
}
@@ -112,7 +111,7 @@ public abstract class AbstractScriptedControllerService extends AbstractControll
scriptNeedsReload.set(true);
// Need to reset scriptEngine if the value has changed
if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor) || ScriptingComponentUtils.MODULES.equals(descriptor)) {
- scriptEngine = null;
+ scriptRunner = null;
}
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptRunnerFactory.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptRunnerFactory.java
new file mode 100644
index 0000000..1b90851
--- /dev/null
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptRunnerFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.script;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.script.ScriptRunner;
+import org.apache.nifi.script.impl.ClojureScriptRunner;
+import org.apache.nifi.script.impl.GenericScriptRunner;
+import org.apache.nifi.script.impl.GroovyScriptRunner;
+import org.apache.nifi.script.impl.JavascriptScriptRunner;
+import org.apache.nifi.script.impl.JythonScriptRunner;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineFactory;
+import javax.script.ScriptException;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+
+public class ScriptRunnerFactory {
+
+ private final static ScriptRunnerFactory INSTANCE = new ScriptRunnerFactory();
+
+ private ScriptRunnerFactory() {
+
+ }
+
+ public static ScriptRunnerFactory getInstance() {
+ return INSTANCE;
+ }
+
+ public ScriptRunner createScriptRunner(ScriptEngineFactory scriptEngineFactory, String scriptToRun, String[] modulePaths)
+ throws ScriptException {
+ ScriptEngine scriptEngine = scriptEngineFactory.getScriptEngine();
+ String scriptEngineName = scriptEngineFactory.getLanguageName();
+ if ("Groovy".equals(scriptEngineName)) {
+ return new GroovyScriptRunner(scriptEngine, scriptToRun, null);
+ }
+ if ("python".equals(scriptEngineName)) {
+ return new JythonScriptRunner(scriptEngine, scriptToRun, modulePaths);
+ }
+ if ("Clojure".equals(scriptEngineName)) {
+ return new ClojureScriptRunner(scriptEngine, scriptToRun, null);
+ }
+ if ("ECMAScript".equals(scriptEngineName)) {
+ return new JavascriptScriptRunner(scriptEngine, scriptToRun, null);
+ }
+ return new GenericScriptRunner(scriptEngine, scriptToRun, null);
+ }
+
+ /**
+ * Scans the given module paths for JARs. The path itself (whether a directory or file) will be added to the list
+ * of returned module URLs, and if a directory is specified, it is scanned for JAR files (files ending with .jar).
+ * Any JAR files found are added to the list of module URLs. This is a convenience method for adding directories
+ * full of JAR files to an ExecuteScript or InvokeScriptedProcessor instance, rather than having to enumerate each
+ * JAR's URL.
+ *
+ * @param modulePaths An array of module paths to scan/add
+ * @param log A logger for the calling component, to provide feedback for missing files, e.g.
+ * @return An array of URLs corresponding to all modules determined from the input set of module paths.
+ */
+ public URL[] getModuleURLsForClasspath(String scriptEngineName, String[] modulePaths, ComponentLog log) {
+
+ if (!"Clojure".equals(scriptEngineName)
+ && !"Groovy".equals(scriptEngineName)
+ && "ECMAScript".equals(scriptEngineName)) {
+ return new URL[0];
+ }
+
+ List<URL> additionalClasspath = new LinkedList<>();
+
+ if (modulePaths == null) {
+ return new URL[0];
+ }
+ for (String modulePathString : modulePaths) {
+ File modulePath = new File(modulePathString);
+
+ if (modulePath.exists()) {
+ // Add the URL of this path
+ try {
+ additionalClasspath.add(modulePath.toURI().toURL());
+ } catch (MalformedURLException mue) {
+ log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue);
+ }
+
+ // If the path is a directory, we need to scan for JARs and add them to the classpath
+ if (!modulePath.isDirectory()) {
+ continue;
+ }
+ File[] jarFiles = modulePath.listFiles((dir, name) -> (name != null && name.endsWith(".jar")));
+
+ if (jarFiles == null) {
+ continue;
+ }
+ // Add each to the classpath
+ for (File jarFile : jarFiles) {
+ try {
+ additionalClasspath.add(jarFile.toURI().toURL());
+
+ } catch (MalformedURLException mue) {
+ log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue);
+ }
+ }
+ } else {
+ log.warn("{} does not exist, ignoring", modulePath.getAbsolutePath());
+ }
+ }
+ return additionalClasspath.toArray(new URL[0]);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptingComponentHelper.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptingComponentHelper.java
index 6467086..cf92340 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptingComponentHelper.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptingComponentHelper.java
@@ -24,28 +24,27 @@ import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.script.ScriptEngineConfigurator;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.script.ScriptRunner;
import org.apache.nifi.util.StringUtils;
-import javax.script.ScriptEngine;
+import javax.script.Invocable;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,8 +55,6 @@ public class ScriptingComponentHelper {
public PropertyDescriptor SCRIPT_ENGINE;
- // A map from engine name to a custom configurator for that engine
- public final Map<String, ScriptEngineConfigurator> scriptEngineConfiguratorMap = new ConcurrentHashMap<>();
public final AtomicBoolean isInitialized = new AtomicBoolean(false);
public Map<String, ScriptEngineFactory> scriptEngineFactoryMap;
@@ -68,7 +65,7 @@ public class ScriptingComponentHelper {
private List<AllowableValue> engineAllowableValues;
private ResourceReferences modules;
- public BlockingQueue<ScriptEngine> engineQ = null;
+ public BlockingQueue<ScriptRunner> scriptRunnerQ = null;
public String getScriptEngineName() {
return scriptEngineName;
@@ -135,14 +132,18 @@ public class ScriptingComponentHelper {
return results;
}
+ public void createResources() {
+ createResources(true);
+ }
+
/**
* This method creates all resources needed for the script processor to function, such as script engines,
* script file reloader threads, etc.
*/
- public void createResources() {
+ public void createResources(final boolean requireInvocable) {
descriptors = new ArrayList<>();
// The following is required for JRuby, should be transparent to everything else.
- // Note this is not done in a ScriptEngineConfigurator, as it is too early in the lifecycle. The
+ // Note this is not done in a ScriptRunner, as it is too early in the lifecycle. The
// setting must be there before the factories/engines are loaded.
System.setProperty("org.jruby.embed.localvariable.behavior", "persistent");
@@ -153,12 +154,14 @@ public class ScriptingComponentHelper {
scriptEngineFactoryMap = new HashMap<>(scriptEngineFactories.size());
List<AllowableValue> engineList = new LinkedList<>();
for (ScriptEngineFactory factory : scriptEngineFactories) {
- engineList.add(new AllowableValue(factory.getLanguageName()));
- scriptEngineFactoryMap.put(factory.getLanguageName(), factory);
+ if (!requireInvocable || factory.getScriptEngine() instanceof Invocable) {
+ engineList.add(new AllowableValue(factory.getLanguageName()));
+ scriptEngineFactoryMap.put(factory.getLanguageName(), factory);
+ }
}
// Sort the list by name so the list always looks the same.
- Collections.sort(engineList, (o1, o2) -> {
+ engineList.sort((o1, o2) -> {
if (o1 == null) {
return o2 == null ? 0 : 1;
}
@@ -169,7 +172,7 @@ public class ScriptingComponentHelper {
});
engineAllowableValues = engineList;
- AllowableValue[] engines = engineList.toArray(new AllowableValue[engineList.size()]);
+ AllowableValue[] engines = engineList.toArray(new AllowableValue[0]);
SCRIPT_ENGINE = new PropertyDescriptor.Builder()
.name("Script Engine")
@@ -200,55 +203,32 @@ public class ScriptingComponentHelper {
return path != null && Files.isRegularFile(Paths.get(path));
}
- /**
- * Performs common setup operations when the processor is scheduled to run. This method assumes the member
- * variables associated with properties have been filled.
- *
- * @param numberOfScriptEngines number of engines to setup
- */
- public void setup(int numberOfScriptEngines, ComponentLog log) {
-
- if (scriptEngineConfiguratorMap.isEmpty()) {
- ServiceLoader<ScriptEngineConfigurator> configuratorServiceLoader =
- ServiceLoader.load(ScriptEngineConfigurator.class);
- for (ScriptEngineConfigurator configurator : configuratorServiceLoader) {
- scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator);
- }
- }
- setupEngines(numberOfScriptEngines, log);
+ public void setupScriptRunners(final int numberOfScriptEngines, final String scriptToRun, final ComponentLog log) {
+ setupScriptRunners(true, numberOfScriptEngines, scriptToRun, log);
}
/**
- * Configures the specified script engine. First, the engine is loaded and instantiated using the JSR-223
+ * Configures the specified script engine(s) as a queue of ScriptRunners. First, the engine is loaded and instantiated using the JSR-223
* javax.script APIs. Then, if any script configurators have been defined for this engine, their init() method is
* called, and the configurator is saved for future calls.
*
* @param numberOfScriptEngines number of engines to setup
- * @see org.apache.nifi.processors.script.ScriptEngineConfigurator
+ * @see org.apache.nifi.processors.script.ScriptRunner
*/
- protected void setupEngines(int numberOfScriptEngines, ComponentLog log) {
- engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines);
+ public void setupScriptRunners(final boolean newQ, final int numberOfScriptEngines, final String scriptToRun, final ComponentLog log) {
+ if (newQ) {
+ scriptRunnerQ = new LinkedBlockingQueue<>(numberOfScriptEngines);
+ }
ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
if (StringUtils.isBlank(scriptEngineName)) {
throw new IllegalArgumentException("The script engine name cannot be null");
}
- ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase());
-
// Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs
- URL[] additionalClasspathURLs = null;
- if (configurator != null) {
- final String[] locations = modules.asLocations().toArray(new String[0]);
- additionalClasspathURLs = configurator.getModuleURLsForClasspath(locations, log);
- } else {
- if (modules != null) {
- final List<URL> urls = modules.asURLs();
- if (!urls.isEmpty()) {
- additionalClasspathURLs = urls.toArray(new URL[urls.size()]);
- }
- }
- }
+ final String[] locations = modules.asLocations().toArray(new String[0]);
+ final URL[] additionalClasspathURLs = ScriptRunnerFactory.getInstance().getModuleURLsForClasspath(scriptEngineName, locations, log);
+
// Need the right classloader when the engine is created. This ensures the NAR's execution class loader
// (plus the module path) becomes the parent for the script engine
@@ -259,11 +239,17 @@ public class ScriptingComponentHelper {
Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader);
}
- for (int i = 0; i < numberOfScriptEngines; i++) {
- ScriptEngine scriptEngine = createScriptEngine();
- if (!engineQ.offer(scriptEngine)) {
- log.error("Error adding script engine {}", new Object[]{scriptEngine.getFactory().getEngineName()});
+ try {
+ for (int i = 0; i < numberOfScriptEngines; i++) {
+ //
+ ScriptEngineFactory factory = scriptEngineFactoryMap.get(scriptEngineName);
+ ScriptRunner scriptRunner = ScriptRunnerFactory.getInstance().createScriptRunner(factory, scriptToRun, locations);
+ if (!scriptRunnerQ.offer(scriptRunner)) {
+ log.error("Error adding script engine {}", scriptRunner.getScriptEngineName());
+ }
}
+ } catch (ScriptException se) {
+ throw new ProcessException("Could not instantiate script engines", se);
}
} finally {
// Restore original context class loader
@@ -278,27 +264,9 @@ public class ScriptingComponentHelper {
modules = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().asResources().flattenRecursively();
}
-
- /**
- * Provides a ScriptEngine corresponding to the currently selected script engine name.
- * ScriptEngineManager.getEngineByName() doesn't use find ScriptEngineFactory.getName(), which
- * is what we used to populate the list. So just search the list of factories until a match is
- * found, then create and return a script engine.
- *
- * @return a Script Engine corresponding to the currently specified name, or null if none is found.
- */
- protected ScriptEngine createScriptEngine() {
- //
- ScriptEngineFactory factory = scriptEngineFactoryMap.get(scriptEngineName);
- if (factory == null) {
- return null;
- }
- return factory.getScriptEngine();
- }
-
public void stop() {
- if (engineQ != null) {
- engineQ.clear();
+ if (scriptRunnerQ != null) {
+ scriptRunnerQ.clear();
}
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/AbstractModuleClassloaderConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/AbstractModuleClassloaderConfigurator.java
deleted file mode 100644
index 2285b1f..0000000
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/AbstractModuleClassloaderConfigurator.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.script.impl;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.script.ScriptEngineConfigurator;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * This base class provides a common implementation for the getModuleURLsForClasspath method of the
- * ScriptEngineConfigurator interface
- */
-public abstract class AbstractModuleClassloaderConfigurator implements ScriptEngineConfigurator {
-
- /**
- * Scans the given module paths for JARs. The path itself (whether a directory or file) will be added to the list
- * of returned module URLs, and if a directory is specified, it is scanned for JAR files (files ending with .jar).
- * Any JAR files found are added to the list of module URLs. This is a convenience method for adding directories
- * full of JAR files to an ExecuteScript or InvokeScriptedProcessor instance, rather than having to enumerate each
- * JAR's URL.
- * @param modulePaths An array of module paths to scan/add
- * @param log A logger for the calling component, to provide feedback for missing files, e.g.
- * @return An array of URLs corresponding to all modules determined from the input set of module paths.
- */
- @Override
- public URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log) {
- List<URL> additionalClasspath = new LinkedList<>();
- if (modulePaths != null) {
- for (String modulePathString : modulePaths) {
- File modulePath = new File(modulePathString);
-
- if (modulePath.exists()) {
- // Add the URL of this path
- try {
- additionalClasspath.add(modulePath.toURI().toURL());
- } catch (MalformedURLException mue) {
- log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue);
- }
-
- // If the path is a directory, we need to scan for JARs and add them to the classpath
- if (modulePath.isDirectory()) {
- File[] jarFiles = modulePath.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return (name != null && name.endsWith(".jar"));
- }
- });
-
- if (jarFiles != null) {
- // Add each to the classpath
- for (File jarFile : jarFiles) {
- try {
- additionalClasspath.add(jarFile.toURI().toURL());
-
- } catch (MalformedURLException mue) {
- log.warn("{} is not a valid file/folder, ignoring", new Object[]{modulePath.getAbsolutePath()}, mue);
- }
- }
- }
- }
- } else {
- log.warn("{} does not exist, ignoring", new Object[]{modulePath.getAbsolutePath()});
- }
- }
- }
- return additionalClasspath.toArray(new URL[additionalClasspath.size()]);
- }
-}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/BaseScriptRunner.java
similarity index 57%
copy from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java
copy to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/BaseScriptRunner.java
index f89d06f..d3ff616 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/BaseScriptRunner.java
@@ -16,27 +16,28 @@
*/
package org.apache.nifi.script.impl;
+import org.apache.nifi.processors.script.ScriptRunner;
+
import javax.script.ScriptEngine;
-import javax.script.ScriptException;
/**
- * This class offers methods to perform Javascript-specific operations during the script engine lifecycle.
+ * This base class provides a common implementation for the member variables underlying the
+ * ScriptRunner interface
*/
-public class JavascriptScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator {
+public abstract class BaseScriptRunner implements ScriptRunner {
- @Override
- public String getScriptEngineName() {
- return "ECMAScript";
- }
+ protected ScriptEngine scriptEngine;
+ protected String scriptBody;
+ protected String[] modulePaths;
- @Override
- public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- // No initialization methods needed at present
- return engine;
+ public BaseScriptRunner(final ScriptEngine engine, final String scriptBody, final String[] modulePaths) {
+ this.scriptEngine = engine;
+ this.scriptBody = scriptBody;
+ this.modulePaths = modulePaths;
}
@Override
- public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- return engine.eval(scriptBody);
+ public ScriptEngine getScriptEngine() {
+ return scriptEngine;
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptRunner.java
similarity index 77%
rename from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptEngineConfigurator.java
rename to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptRunner.java
index 3754d4c..67eb59b 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptEngineConfigurator.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/ClojureScriptRunner.java
@@ -18,10 +18,12 @@ package org.apache.nifi.script.impl;
import org.apache.nifi.processors.script.engine.ClojureScriptEngine;
+import javax.script.Bindings;
+import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
-public class ClojureScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator {
+public class ClojureScriptRunner extends BaseScriptRunner {
private static final String PRELOADS =
"(:import \n"
@@ -37,37 +39,30 @@ public class ClojureScriptEngineConfigurator extends AbstractModuleClassloaderCo
+ "[org.apache.nifi.processor.exception FlowFileAccessException FlowFileHandlingException MissingFlowFileException ProcessException]\n"
+ "[org.apache.nifi.processor.io InputStreamCallback OutputStreamCallback StreamCallback]\n"
+ "[org.apache.nifi.processor.util FlowFileFilters StandardValidators]\n"
- + "[org.apache.nifi.processors.script ExecuteScript InvokeScriptedProcessor ScriptEngineConfigurator]\n"
+ + "[org.apache.nifi.processors.script ExecuteScript InvokeScriptedProcessor ScriptRunner]\n"
+ "[org.apache.nifi.script ScriptingComponentHelper ScriptingComponentUtils]\n"
+ "[org.apache.nifi.logging ComponentLog]\n"
+ "[org.apache.nifi.lookup LookupService RecordLookupService StringLookupService LookupFailureException]\n"
+ "[org.apache.nifi.record.sink RecordSinkService]\n"
+ ")\n";
-
- private ScriptEngine scriptEngine;
+ public ClojureScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) {
+ super(engine, scriptBody, modulePaths);
+ }
@Override
public String getScriptEngineName() {
return "Clojure";
}
-
- @Override
- public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- scriptEngine = engine;
- return scriptEngine;
- }
-
@Override
- public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- scriptEngine = engine;
- StringBuilder sb = new StringBuilder("(ns ");
- sb.append(((ClojureScriptEngine) scriptEngine).getNamespace());
- sb.append(" ");
- sb.append(PRELOADS);
- sb.append(")\n");
- sb.append(scriptBody);
- return engine.eval(sb.toString());
+ public void run(Bindings bindings) throws ScriptException {
+ String sb = "(ns " + ((ClojureScriptEngine) scriptEngine).getNamespace() +
+ " " +
+ PRELOADS +
+ ")\n" +
+ scriptBody;
+ scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
+ scriptEngine.eval(sb);
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GenericScriptRunner.java
similarity index 63%
copy from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java
copy to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GenericScriptRunner.java
index f89d06f..028c1af 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GenericScriptRunner.java
@@ -16,27 +16,29 @@
*/
package org.apache.nifi.script.impl;
+import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
/**
- * This class offers methods to perform Javascript-specific operations during the script engine lifecycle.
+ * This class offers methods to perform operations during the script runner lifecycle.
*/
-public class JavascriptScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator {
+public class GenericScriptRunner extends BaseScriptRunner {
- @Override
- public String getScriptEngineName() {
- return "ECMAScript";
+ private String engineName = "Unknown";
+
+ public GenericScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) {
+ super(engine, scriptBody, modulePaths);
+ this.engineName = engine.getFactory().getEngineName();
}
@Override
- public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- // No initialization methods needed at present
- return engine;
+ public String getScriptEngineName() {
+ return engineName;
}
@Override
- public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- return engine.eval(scriptBody);
+ public void run(Bindings bindings) throws ScriptException {
+ scriptEngine.eval(scriptBody);
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptRunner.java
similarity index 71%
rename from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptEngineConfigurator.java
rename to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptRunner.java
index b86d356..a99cbfa 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptEngineConfigurator.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GroovyScriptRunner.java
@@ -16,10 +16,12 @@
*/
package org.apache.nifi.script.impl;
+import javax.script.Bindings;
+import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
-public class GroovyScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator {
+public class GroovyScriptRunner extends BaseScriptRunner {
private static final String PRELOADS =
"import org.apache.nifi.components.*\n"
@@ -34,26 +36,18 @@ public class GroovyScriptEngineConfigurator extends AbstractModuleClassloaderCon
+ "import org.apache.nifi.record.sink.*\n"
+ "import org.apache.nifi.lookup.*\n";
- private ScriptEngine scriptEngine;
+ public GroovyScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) {
+ super(engine, scriptBody, modulePaths);
+ }
@Override
public String getScriptEngineName() {
return "Groovy";
}
-
-
- @Override
- public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- // No need to compile the script here, Groovy does it under the hood and its CompiledScript object just
- // calls engine.eval() the same as we do in the eval() method below
- scriptEngine = engine;
- return scriptEngine;
- }
-
@Override
- public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- scriptEngine = engine;
- return engine.eval(PRELOADS + scriptBody);
+ public void run(Bindings bindings) throws ScriptException {
+ scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
+ scriptEngine.eval(PRELOADS + scriptBody);
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptRunner.java
similarity index 69%
rename from nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java
rename to nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptRunner.java
index f89d06f..39dc3b4 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptEngineConfigurator.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JavascriptScriptRunner.java
@@ -16,27 +16,26 @@
*/
package org.apache.nifi.script.impl;
+import javax.script.Bindings;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
/**
- * This class offers methods to perform Javascript-specific operations during the script engine lifecycle.
+ * This class offers methods to perform Javascript-specific operations during the script runner lifecycle.
*/
-public class JavascriptScriptEngineConfigurator extends AbstractModuleClassloaderConfigurator {
+public class JavascriptScriptRunner extends BaseScriptRunner {
- @Override
- public String getScriptEngineName() {
- return "ECMAScript";
+ public JavascriptScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) {
+ super(engine, scriptBody, modulePaths);
}
@Override
- public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- // No initialization methods needed at present
- return engine;
+ public String getScriptEngineName() {
+ return "ECMAScript";
}
@Override
- public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- return engine.eval(scriptBody);
+ public void run(Bindings bindings) throws ScriptException {
+ scriptEngine.eval(scriptBody);
}
}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptEngineConfigurator.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptEngineConfigurator.java
deleted file mode 100644
index 14e291f..0000000
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptEngineConfigurator.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.script.impl;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.script.ScriptEngineConfigurator;
-import org.python.core.PyString;
-
-import javax.script.Compilable;
-import javax.script.CompiledScript;
-import javax.script.ScriptEngine;
-import javax.script.ScriptException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-/**
- * A helper class to configure the Jython engine with any specific requirements
- */
-public class JythonScriptEngineConfigurator implements ScriptEngineConfigurator {
-
- private final AtomicReference<CompiledScript> compiledScriptRef = new AtomicReference<>();
-
- @Override
- public String getScriptEngineName() {
- return "python";
- }
-
- @Override
- public URL[] getModuleURLsForClasspath(String[] modulePaths, ComponentLog log) {
- // We don't need to add the module paths to the classpath, they will be added via sys.path.append
- return new URL[0];
- }
-
- @Override
- public Object init(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- // Always compile when first run
- if (engine != null && compiledScriptRef.get() == null) {
- // Add prefix for import sys and all jython modules
- String prefix = "import sys\n"
- + Arrays.stream(modulePaths).map((modulePath) -> "sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")")
- .collect(Collectors.joining("\n"));
- final CompiledScript compiled = ((Compilable) engine).compile(prefix + scriptBody);
- compiledScriptRef.set(compiled);
- }
- return compiledScriptRef.get();
- }
-
- @Override
- public Object eval(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
- Object returnValue = null;
- if (engine != null) {
- final CompiledScript existing = compiledScriptRef.get();
- if (existing == null) {
- throw new ScriptException("Jython script has not been compiled successfully, the component must be restarted.");
- }
- returnValue = compiledScriptRef.get().eval();
- }
- return returnValue;
- }
-
- @Override
- public void reset() {
- compiledScriptRef.set(null);
- }
-}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptRunner.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptRunner.java
new file mode 100644
index 0000000..da315b0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/JythonScriptRunner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.script.impl;
+
+import org.python.core.PyString;
+
+import javax.script.Bindings;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/**
+ * A helper class to configure the Jython engine with any specific requirements
+ */
+public class JythonScriptRunner extends BaseScriptRunner {
+
+ private final CompiledScript compiledScript;
+
+ public JythonScriptRunner(ScriptEngine engine, String scriptBody, String[] modulePaths) throws ScriptException {
+ super(engine, scriptBody, modulePaths);
+ // Add prefix for import sys and all jython modules
+ String prefix = "import sys\n"
+ + Arrays.stream(modulePaths).map((modulePath) -> "sys.path.append(" + PyString.encode_UnicodeEscape(modulePath, true) + ")")
+ .collect(Collectors.joining("\n"));
+ compiledScript = ((Compilable) engine).compile(prefix + scriptBody);
+ }
+
+ @Override
+ public String getScriptEngineName() {
+ return "python";
+ }
+
+ @Override
+ public ScriptEngine getScriptEngine() {
+ return scriptEngine;
+ }
+
+ @Override
+ public void run(Bindings bindings) throws ScriptException {
+ if (compiledScript == null) {
+ throw new ScriptException("Jython script has not been successfully compiled");
+ }
+ compiledScript.eval();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.script.ScriptEngineConfigurator b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.script.ScriptEngineConfigurator
index fa53e29..5ee7038 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.script.ScriptEngineConfigurator
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.script.ScriptEngineConfigurator
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.script.impl.ClojureScriptEngineConfigurator
-org.apache.nifi.script.impl.JythonScriptEngineConfigurator
-org.apache.nifi.script.impl.GroovyScriptEngineConfigurator
-org.apache.nifi.script.impl.JavascriptScriptEngineConfigurator
+org.apache.nifi.script.impl.ClojureScriptRunner
+org.apache.nifi.script.impl.JythonScriptRunner
+org.apache.nifi.script.impl.GroovyScriptRunner
+org.apache.nifi.script.impl.JavascriptScriptRunner
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskTest.groovy
index 47fff2a..609fbb5 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskTest.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/reporting/script/ScriptedReportingTaskTest.groovy
@@ -96,11 +96,12 @@ class ScriptedReportingTaskTest {
task.onTrigger context
// This script should return a variable x with the number of events and a variable e with the first event
- def se = task.scriptEngine
+ def sr = task.scriptRunner
+ def se = sr.scriptEngine
assertEquals 3, se.x
assertEquals '1234', se.e.componentId
assertEquals 'xyz', se.e.attributes.abc
- task.offerScriptEngine(se)
+ task.offerScriptRunner(sr)
}
private ProvenanceEventRecord createProvenanceEvent(final long id) {
@@ -138,10 +139,11 @@ class ScriptedReportingTaskTest {
task.setup configurationContext
task.onTrigger context
- def se = task.scriptEngine
+ def sr = task.scriptRunner
+ def se = sr.scriptEngine
// This script should store a variable called x with a map of stats to values
assertTrue se.x?.uptime >= 0
- task.offerScriptEngine(se)
+ task.offerScriptRunner(sr)
}
@@ -171,20 +173,21 @@ class ScriptedReportingTaskTest {
task.setup configurationContext
task.onTrigger context
- def se = task.scriptEngine
+ def sr = task.scriptRunner
+ def se = sr.scriptEngine
// This script should store a variable called x with a map of stats to values
assertTrue se.x?.uptime >= 0
- task.offerScriptEngine(se)
+ task.offerScriptRunner(sr)
}
class MockScriptedReportingTask extends ScriptedReportingTask implements AccessibleScriptingComponentHelper {
- def getScriptEngine() {
- return scriptingComponentHelper.engineQ.poll()
+ def getScriptRunner() {
+ return scriptingComponentHelper.scriptRunnerQ.poll()
}
- def offerScriptEngine(engine) {
- scriptingComponentHelper.engineQ.offer(engine)
+ def offerScriptRunner(runner) {
+ scriptingComponentHelper.scriptRunnerQ.offer(runner)
}
@Override
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
index b4908cd..03395a2 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy
@@ -1,11 +1,3 @@
-import org.apache.nifi.components.PropertyDescriptor
-import org.apache.nifi.components.ValidationResult
-import org.apache.nifi.processor.ProcessContext
-import org.apache.nifi.processor.ProcessSessionFactory
-import org.apache.nifi.processor.ProcessorInitializationContext
-import org.apache.nifi.processor.Relationship
-import org.apache.nifi.processor.exception.ProcessException
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -67,7 +59,7 @@ class GroovyProcessor implements Processor {
def session = sessionFactory.createSession()
def flowFile = session.get();
if (flowFile == null) {
- return;
+ return
}
flowFile = session.putAttribute(flowFile, 'from-content', setAttributeFromThisInOnScheduled)
// transfer
@@ -102,4 +94,4 @@ class GroovyProcessor implements Processor {
}
}
-processor = new GroovyProcessor();
\ No newline at end of file
+processor = new GroovyProcessor()
\ No newline at end of file