You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/06/04 15:59:38 UTC

[GitHub] [nifi] pgyori commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

pgyori commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r645662468



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
##########
@@ -253,31 +226,16 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
                     }
                 }
 
-                scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
-
-                // Execute any engine-specific configuration before the script is evaluated
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-
-                // Evaluate the script with the configurator (if it exists) or the engine
-                if (configurator != null) {
-                    configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                    configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                } else {
-                    scriptEngine.eval(scriptToRun);
-                }
+                scriptRunner.run(bindings);
 
                 // Commit this session for the user. This plus the outermost catch statement mimics the behavior
                 // of AbstractProcessor. This class doesn't extend AbstractProcessor in order to share a base
                 // class with InvokeScriptedProcessor
                 session.commitAsync();
+                scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);
             } catch (ScriptException e) {
-                // Reset the configurator on error, this can indicate to the configurator to recompile the script on next init()
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-                if (configurator != null) {
-                    configurator.reset();
-                }
+                // Create a new ScriptRunner to replace the one that caused an exception
+                scriptingComponentHelper.setupScriptRunners(1, scriptToRun, getLogger());

Review comment:
       If I understand correctly, if we have multiple ScriptRunners in the queue, but one causes an exception, we get rid of all the ScriptRunners in the queue and create a queue with only 1 ScriptRunner in it. Do we not need maxTasks number of them?

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java
##########
@@ -217,26 +211,31 @@ public void onEnabled(final ConfigurationContext context) {
         super.onEnabled(context);
 
         // Call an non-interface method onEnabled(context), to allow a scripted RecordSinkService the chance to set up as necessary
-        final Invocable invocable = (Invocable) scriptEngine;
-        if (configurationContext != null) {
-            try {
-                // Get the actual object from the script engine, versus the proxy stored in RecordSinkService. The object may have additional methods,
-                // where RecordSinkService is a proxied interface
-                final Object obj = scriptEngine.get("recordSink");
-                if (obj != null) {
-                    try {
-                        invocable.invokeMethod(obj, "onEnabled", context);
-                    } catch (final NoSuchMethodException nsme) {
-                        if (getLogger().isDebugEnabled()) {
-                            getLogger().debug("Configured script RecordSinkService does not contain an onEnabled() method.");
+        if (scriptRunner != null) {
+            final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+            final Invocable invocable = (Invocable) scriptEngine;

Review comment:
       In the reloadScript() method there is a check:
   `if (scriptEngine instanceof Invocable)`
   Is it not needed here as well?

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java
##########
@@ -190,53 +184,63 @@ public void onEnabled(final ConfigurationContext context) {
         super.onEnabled(context);
 
         // Call an non-interface method onEnabled(context), to allow a scripted ActionHandler the chance to set up as necessary
-        final Invocable invocable = (Invocable) scriptEngine;
-        if (configurationContext != null) {
-            try {
-                // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
-                // where ActionHandler is a proxied interface
-                final Object obj = scriptEngine.get("actionHandler");
-                if (obj != null) {
-                    try {
-                        invocable.invokeMethod(obj, "onEnabled", context);
-                    } catch (final NoSuchMethodException nsme) {
-                        if (getLogger().isDebugEnabled()) {
-                            getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method.");
+        if (scriptRunner != null) {
+            final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+            final Invocable invocable = (Invocable) scriptEngine;
+            if (configurationContext != null) {
+                try {
+                    // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
+                    // where ActionHandler is a proxied interface
+                    final Object obj = scriptRunner.getScriptEngine().get("actionHandler");
+                    if (obj != null) {
+                        try {
+                            invocable.invokeMethod(obj, "onEnabled", context);
+                        } catch (final NoSuchMethodException nsme) {
+                            if (getLogger().isDebugEnabled()) {
+                                getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method.");
+                            }
                         }
+                    } else {
+                        throw new ScriptException("No ActionHandler was defined by the script.");
                     }
-                } else {
-                    throw new ScriptException("No ActionHandler was defined by the script.");
+                } catch (ScriptException se) {
+                    throw new ProcessException("Error executing onEnabled(context) method", se);
                 }
-            } catch (ScriptException se) {
-                throw new ProcessException("Error executing onEnabled(context) method", se);
             }
+        } else {
+            throw new ProcessException("Error creating ScriptRunner");
         }

Review comment:
       This part is very similar to what is in the execute() method. Can it be organized in a separate method to avoid duplication?

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java
##########
@@ -110,23 +101,26 @@ protected boolean reloadScript(final String scriptBody) {
         final Collection<ValidationResult> results = new HashSet<>();
 
         try {
+            // Create a single script engine, the Processor object is reused by each task
+            if (scriptRunner == null) {
+                scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+                scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+            }
+
+            if (scriptRunner == null) {
+                throw new ProcessException("No script engine available!");

Review comment:
       Should we still say "script engine", not "script runner"?

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/engine/script/ScriptedRulesEngine.java
##########
@@ -108,20 +99,23 @@ protected boolean reloadScript(final String scriptBody) {
         final Collection<ValidationResult> results = new HashSet<>();
 
         try {
+            // Create a single script engine, the Processor object is reused by each task
+            if (scriptRunner == null) {
+                scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+                scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+            }
+
+            if (scriptRunner == null) {
+                throw new ProcessException("No script engine available!");

Review comment:
       Should we still say "script engine", not "script runner"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org