You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/06/02 13:51:51 UTC

[GitHub] [nifi] mattyb149 opened a new pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

mattyb149 opened a new pull request #5116:
URL: https://github.com/apache/nifi/pull/5116


   
   <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
         http://www.apache.org/licenses/LICENSE-2.0
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
   -->
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   Replaces ScriptEngineConfigurator with ScriptRunner to abstract the details of using the ScriptEngine to optionally compile the scripts, and to avoid sharing the ProcessSession between concurrent tasks.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [x] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [x] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] mattyb149 commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r645878547



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java
##########
@@ -110,23 +101,26 @@ protected boolean reloadScript(final String scriptBody) {
         final Collection<ValidationResult> results = new HashSet<>();
 
         try {
+            // Create a single script engine, the Processor object is reused by each task
+            if (scriptRunner == null) {
+                scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+                scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+            }
+
+            if (scriptRunner == null) {
+                throw new ProcessException("No script engine available!");

Review comment:
       Probably, I'll search-and-replace these




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r646786993



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/impl/GenericScriptRunner.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.script.impl;
+
+import javax.script.Bindings;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+
+/**
+ * This class offers methods to perform Javascript-specific operations during the script runner lifecycle.

Review comment:
       This is generic, not Javascript-specific.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] mattyb149 commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r649350157



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
##########
@@ -253,31 +226,16 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
                     }
                 }
 
-                scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
-
-                // Execute any engine-specific configuration before the script is evaluated
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-
-                // Evaluate the script with the configurator (if it exists) or the engine
-                if (configurator != null) {
-                    configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                    configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                } else {
-                    scriptEngine.eval(scriptToRun);
-                }
+                scriptRunner.run(bindings);
 
                 // Commit this session for the user. This plus the outermost catch statement mimics the behavior
                 // of AbstractProcessor. This class doesn't extend AbstractProcessor in order to share a base
                 // class with InvokeScriptedProcessor
                 session.commitAsync();
+                scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);

Review comment:
       Good call, in the exception case I'll create a new runner and assign it to scriptRunner so we can do the offer in the finally clause




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] mattyb149 commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r645878908



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java
##########
@@ -190,53 +184,63 @@ public void onEnabled(final ConfigurationContext context) {
         super.onEnabled(context);
 
         // Call an non-interface method onEnabled(context), to allow a scripted ActionHandler the chance to set up as necessary
-        final Invocable invocable = (Invocable) scriptEngine;
-        if (configurationContext != null) {
-            try {
-                // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
-                // where ActionHandler is a proxied interface
-                final Object obj = scriptEngine.get("actionHandler");
-                if (obj != null) {
-                    try {
-                        invocable.invokeMethod(obj, "onEnabled", context);
-                    } catch (final NoSuchMethodException nsme) {
-                        if (getLogger().isDebugEnabled()) {
-                            getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method.");
+        if (scriptRunner != null) {
+            final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+            final Invocable invocable = (Invocable) scriptEngine;
+            if (configurationContext != null) {
+                try {
+                    // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
+                    // where ActionHandler is a proxied interface
+                    final Object obj = scriptRunner.getScriptEngine().get("actionHandler");
+                    if (obj != null) {
+                        try {
+                            invocable.invokeMethod(obj, "onEnabled", context);
+                        } catch (final NoSuchMethodException nsme) {
+                            if (getLogger().isDebugEnabled()) {
+                                getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method.");
+                            }
                         }
+                    } else {
+                        throw new ScriptException("No ActionHandler was defined by the script.");
                     }
-                } else {
-                    throw new ScriptException("No ActionHandler was defined by the script.");
+                } catch (ScriptException se) {
+                    throw new ProcessException("Error executing onEnabled(context) method", se);
                 }
-            } catch (ScriptException se) {
-                throw new ProcessException("Error executing onEnabled(context) method", se);
             }
+        } else {
+            throw new ProcessException("Error creating ScriptRunner");
         }

Review comment:
       I think the difference is in the method invoked (and their arguments), maybe I can extract that out into a Function and reuse the common code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r648313153



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
##########
@@ -253,31 +226,16 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
                     }
                 }
 
-                scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
-
-                // Execute any engine-specific configuration before the script is evaluated
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-
-                // Evaluate the script with the configurator (if it exists) or the engine
-                if (configurator != null) {
-                    configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                    configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                } else {
-                    scriptEngine.eval(scriptToRun);
-                }
+                scriptRunner.run(bindings);
 
                 // Commit this session for the user. This plus the outermost catch statement mimics the behavior
                 // of AbstractProcessor. This class doesn't extend AbstractProcessor in order to share a base
                 // class with InvokeScriptedProcessor
                 session.commitAsync();
+                scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);
             } catch (ScriptException e) {
-                // Reset the configurator on error, this can indicate to the configurator to recompile the script on next init()
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-                if (configurator != null) {
-                    configurator.reset();
-                }
+                // Create a new ScriptRunner to replace the one that caused an exception
+                scriptingComponentHelper.setupScriptRunners(false, 1, scriptToRun, getLogger());

Review comment:
       Similar to what @pgyori was mentioning above - we need to ensure that we always account for the script runner. It makes sense to have it offer in the 'good path' and do this in the bad path. But this is done only for `ScriptException`. It needs to be done for any Exception. It is very possible, for example, that the script doesn't properly account for all FlowFiles and session.commitAsync() could then throw an RuntimeException. In that situation, you'd lose the script runner and after X times (where X = number of concurrent tasks) the processor would just do nothing, constantly getting nothing from the poll() call

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/script/ScriptRunnerFactory.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.script;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.script.ScriptRunner;
+import org.apache.nifi.script.impl.ClojureScriptRunner;
+import org.apache.nifi.script.impl.GenericScriptRunner;
+import org.apache.nifi.script.impl.GroovyScriptRunner;
+import org.apache.nifi.script.impl.JavascriptScriptRunner;
+import org.apache.nifi.script.impl.JythonScriptRunner;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineFactory;
+import javax.script.ScriptException;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+
+public class ScriptRunnerFactory {
+
+    private final static ScriptRunnerFactory INSTANCE = new ScriptRunnerFactory();
+
+    private ScriptRunnerFactory() {
+
+    }
+
+    public static ScriptRunnerFactory getInstance() {
+        return INSTANCE;
+    }
+
+    public ScriptRunner createScriptRunner(ScriptEngineFactory scriptEngineFactory, String scriptToRun, String[] modulePaths)
+            throws ScriptException {
+        ScriptEngine scriptEngine = scriptEngineFactory.getScriptEngine();
+        String scriptEngineName = scriptEngineFactory.getLanguageName();
+        if ("Groovy".equals(scriptEngineName)) {
+            return new GroovyScriptRunner(scriptEngine, scriptToRun, null);
+        }
+        if ("python".equals(scriptEngineName)) {
+            return new JythonScriptRunner(scriptEngine, scriptToRun, modulePaths);
+        }
+        if ("Clojure".equals(scriptEngineName)) {
+            return new ClojureScriptRunner(scriptEngine, scriptToRun, null);
+        }
+        if ("ECMAScript".equals(scriptEngineName)) {
+            return new JavascriptScriptRunner(scriptEngine, scriptToRun, null);
+        }
+        return new GenericScriptRunner(scriptEngine, scriptToRun, null);
+    }
+
+    /**
+     * Scans the given module paths for JARs. The path itself (whether a directory or file) will be added to the list
+     * of returned module URLs, and if a directory is specified, it is scanned for JAR files (files ending with .jar).
+     * Any JAR files found are added to the list of module URLs. This is a convenience method for adding directories
+     * full of JAR files to an ExecuteScript or InvokeScriptedProcessor instance, rather than having to enumerate each
+     * JAR's URL.
+     *
+     * @param modulePaths An array of module paths to scan/add
+     * @param log         A logger for the calling component, to provide feedback for missing files, e.g.
+     * @return An array of URLs corresponding to all modules determined from the input set of module paths.
+     */
+    public URL[] getModuleURLsForClasspath(String scriptEngineName, String[] modulePaths, ComponentLog log) {
+
+        if ("Clojure".equals(scriptEngineName)
+                || "Groovy".equals(scriptEngineName)
+                || "ECMAScript".equals(scriptEngineName)) {
+
+            List<URL> additionalClasspath = new LinkedList<>();
+            if (modulePaths != null) {

Review comment:
       May be worth refactoring to use the "fail-fast" / "escape-fast" approach here:
   ```
   if (modulesPaths == null) {
     return new URL[0];
   }
   ```
   And so forth. Should result in minimal nesting and easier-to-read code IMO




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r645662468



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
##########
@@ -253,31 +226,16 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
                     }
                 }
 
-                scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
-
-                // Execute any engine-specific configuration before the script is evaluated
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-
-                // Evaluate the script with the configurator (if it exists) or the engine
-                if (configurator != null) {
-                    configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                    configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                } else {
-                    scriptEngine.eval(scriptToRun);
-                }
+                scriptRunner.run(bindings);
 
                 // Commit this session for the user. This plus the outermost catch statement mimics the behavior
                 // of AbstractProcessor. This class doesn't extend AbstractProcessor in order to share a base
                 // class with InvokeScriptedProcessor
                 session.commitAsync();
+                scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);
             } catch (ScriptException e) {
-                // Reset the configurator on error, this can indicate to the configurator to recompile the script on next init()
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-                if (configurator != null) {
-                    configurator.reset();
-                }
+                // Create a new ScriptRunner to replace the one that caused an exception
+                scriptingComponentHelper.setupScriptRunners(1, scriptToRun, getLogger());

Review comment:
       If I understand correctly, if we have multiple ScriptRunners in the queue, but one causes an exception, we get rid of all the ScriptRunners in the queue and create a queue with only 1 ScriptRunner in it. Do we not need maxTasks number of them?

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java
##########
@@ -217,26 +211,31 @@ public void onEnabled(final ConfigurationContext context) {
         super.onEnabled(context);
 
         // Call an non-interface method onEnabled(context), to allow a scripted RecordSinkService the chance to set up as necessary
-        final Invocable invocable = (Invocable) scriptEngine;
-        if (configurationContext != null) {
-            try {
-                // Get the actual object from the script engine, versus the proxy stored in RecordSinkService. The object may have additional methods,
-                // where RecordSinkService is a proxied interface
-                final Object obj = scriptEngine.get("recordSink");
-                if (obj != null) {
-                    try {
-                        invocable.invokeMethod(obj, "onEnabled", context);
-                    } catch (final NoSuchMethodException nsme) {
-                        if (getLogger().isDebugEnabled()) {
-                            getLogger().debug("Configured script RecordSinkService does not contain an onEnabled() method.");
+        if (scriptRunner != null) {
+            final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+            final Invocable invocable = (Invocable) scriptEngine;

Review comment:
       In the reloadScript() method there is a check:
   `if (scriptEngine instanceof Invocable)`
   Is it not needed here as well?

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java
##########
@@ -190,53 +184,63 @@ public void onEnabled(final ConfigurationContext context) {
         super.onEnabled(context);
 
         // Call an non-interface method onEnabled(context), to allow a scripted ActionHandler the chance to set up as necessary
-        final Invocable invocable = (Invocable) scriptEngine;
-        if (configurationContext != null) {
-            try {
-                // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
-                // where ActionHandler is a proxied interface
-                final Object obj = scriptEngine.get("actionHandler");
-                if (obj != null) {
-                    try {
-                        invocable.invokeMethod(obj, "onEnabled", context);
-                    } catch (final NoSuchMethodException nsme) {
-                        if (getLogger().isDebugEnabled()) {
-                            getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method.");
+        if (scriptRunner != null) {
+            final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+            final Invocable invocable = (Invocable) scriptEngine;
+            if (configurationContext != null) {
+                try {
+                    // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
+                    // where ActionHandler is a proxied interface
+                    final Object obj = scriptRunner.getScriptEngine().get("actionHandler");
+                    if (obj != null) {
+                        try {
+                            invocable.invokeMethod(obj, "onEnabled", context);
+                        } catch (final NoSuchMethodException nsme) {
+                            if (getLogger().isDebugEnabled()) {
+                                getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method.");
+                            }
                         }
+                    } else {
+                        throw new ScriptException("No ActionHandler was defined by the script.");
                     }
-                } else {
-                    throw new ScriptException("No ActionHandler was defined by the script.");
+                } catch (ScriptException se) {
+                    throw new ProcessException("Error executing onEnabled(context) method", se);
                 }
-            } catch (ScriptException se) {
-                throw new ProcessException("Error executing onEnabled(context) method", se);
             }
+        } else {
+            throw new ProcessException("Error creating ScriptRunner");
         }

Review comment:
       This part is very similar to what is in the execute() method. Can it be organized in a separate method to avoid duplication?

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java
##########
@@ -110,23 +101,26 @@ protected boolean reloadScript(final String scriptBody) {
         final Collection<ValidationResult> results = new HashSet<>();
 
         try {
+            // Create a single script engine, the Processor object is reused by each task
+            if (scriptRunner == null) {
+                scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+                scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+            }
+
+            if (scriptRunner == null) {
+                throw new ProcessException("No script engine available!");

Review comment:
       Should we still say "script engine", not "script runner"?

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/engine/script/ScriptedRulesEngine.java
##########
@@ -108,20 +99,23 @@ protected boolean reloadScript(final String scriptBody) {
         final Collection<ValidationResult> results = new HashSet<>();
 
         try {
+            // Create a single script engine, the Processor object is reused by each task
+            if (scriptRunner == null) {
+                scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
+                scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
+            }
+
+            if (scriptRunner == null) {
+                throw new ProcessException("No script engine available!");

Review comment:
       Should we still say "script engine", not "script runner"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] mattyb149 commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r646735130



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
##########
@@ -253,31 +226,16 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
                     }
                 }
 
-                scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
-
-                // Execute any engine-specific configuration before the script is evaluated
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-
-                // Evaluate the script with the configurator (if it exists) or the engine
-                if (configurator != null) {
-                    configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                    configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                } else {
-                    scriptEngine.eval(scriptToRun);
-                }
+                scriptRunner.run(bindings);
 
                 // Commit this session for the user. This plus the outermost catch statement mimics the behavior
                 // of AbstractProcessor. This class doesn't extend AbstractProcessor in order to share a base
                 // class with InvokeScriptedProcessor
                 session.commitAsync();
+                scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);
             } catch (ScriptException e) {
-                // Reset the configurator on error, this can indicate to the configurator to recompile the script on next init()
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-                if (configurator != null) {
-                    configurator.reset();
-                }
+                // Create a new ScriptRunner to replace the one that caused an exception
+                scriptingComponentHelper.setupScriptRunners(1, scriptToRun, getLogger());

Review comment:
       The successful tasks offer their runner back to the queue, the unsuccessful ones should replace the runner with a new one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] mattyb149 commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r646650078



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java
##########
@@ -217,26 +211,31 @@ public void onEnabled(final ConfigurationContext context) {
         super.onEnabled(context);
 
         // Call an non-interface method onEnabled(context), to allow a scripted RecordSinkService the chance to set up as necessary
-        final Invocable invocable = (Invocable) scriptEngine;
-        if (configurationContext != null) {
-            try {
-                // Get the actual object from the script engine, versus the proxy stored in RecordSinkService. The object may have additional methods,
-                // where RecordSinkService is a proxied interface
-                final Object obj = scriptEngine.get("recordSink");
-                if (obj != null) {
-                    try {
-                        invocable.invokeMethod(obj, "onEnabled", context);
-                    } catch (final NoSuchMethodException nsme) {
-                        if (getLogger().isDebugEnabled()) {
-                            getLogger().debug("Configured script RecordSinkService does not contain an onEnabled() method.");
+        if (scriptRunner != null) {
+            final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+            final Invocable invocable = (Invocable) scriptEngine;

Review comment:
       I decided to update the `createResources()` method to take a boolean (defaulting to true) to indicate whether the supported script engines are required to be Invocable. I'll change ExecuteScript to call it with false, and the rest will only allow a choice of an Invocable engine, so the individual checks should be unnecessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] mattyb149 commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r646734198



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/rules/handlers/script/ScriptedActionHandler.java
##########
@@ -190,53 +184,63 @@ public void onEnabled(final ConfigurationContext context) {
         super.onEnabled(context);
 
         // Call an non-interface method onEnabled(context), to allow a scripted ActionHandler the chance to set up as necessary
-        final Invocable invocable = (Invocable) scriptEngine;
-        if (configurationContext != null) {
-            try {
-                // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
-                // where ActionHandler is a proxied interface
-                final Object obj = scriptEngine.get("actionHandler");
-                if (obj != null) {
-                    try {
-                        invocable.invokeMethod(obj, "onEnabled", context);
-                    } catch (final NoSuchMethodException nsme) {
-                        if (getLogger().isDebugEnabled()) {
-                            getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method.");
+        if (scriptRunner != null) {
+            final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+            final Invocable invocable = (Invocable) scriptEngine;
+            if (configurationContext != null) {
+                try {
+                    // Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
+                    // where ActionHandler is a proxied interface
+                    final Object obj = scriptRunner.getScriptEngine().get("actionHandler");
+                    if (obj != null) {
+                        try {
+                            invocable.invokeMethod(obj, "onEnabled", context);
+                        } catch (final NoSuchMethodException nsme) {
+                            if (getLogger().isDebugEnabled()) {
+                                getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method.");
+                            }
                         }
+                    } else {
+                        throw new ScriptException("No ActionHandler was defined by the script.");
                     }
-                } else {
-                    throw new ScriptException("No ActionHandler was defined by the script.");
+                } catch (ScriptException se) {
+                    throw new ProcessException("Error executing onEnabled(context) method", se);
                 }
-            } catch (ScriptException se) {
-                throw new ProcessException("Error executing onEnabled(context) method", se);
             }
+        } else {
+            throw new ProcessException("Error creating ScriptRunner");
         }

Review comment:
       Looks like a number of scripted components use this same pattern (they have to get the Object from the script engine instead of using a proxied object that only lets you invoke the interface method). I think we should refactor those under a separate improvement Jira




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] mattyb149 commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r645878298



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/sink/script/ScriptedRecordSink.java
##########
@@ -217,26 +211,31 @@ public void onEnabled(final ConfigurationContext context) {
         super.onEnabled(context);
 
         // Call an non-interface method onEnabled(context), to allow a scripted RecordSinkService the chance to set up as necessary
-        final Invocable invocable = (Invocable) scriptEngine;
-        if (configurationContext != null) {
-            try {
-                // Get the actual object from the script engine, versus the proxy stored in RecordSinkService. The object may have additional methods,
-                // where RecordSinkService is a proxied interface
-                final Object obj = scriptEngine.get("recordSink");
-                if (obj != null) {
-                    try {
-                        invocable.invokeMethod(obj, "onEnabled", context);
-                    } catch (final NoSuchMethodException nsme) {
-                        if (getLogger().isDebugEnabled()) {
-                            getLogger().debug("Configured script RecordSinkService does not contain an onEnabled() method.");
+        if (scriptRunner != null) {
+            final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
+            final Invocable invocable = (Invocable) scriptEngine;

Review comment:
       Yeah there might be a couple spots where the check should be done, probably a copy-paste error, I'll take a look. Good catch!
   
   I think the script engines we include that can actually subclass a Processor or other component are all Invocable, but we should check here defensively.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r646664759



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
##########
@@ -223,14 +195,15 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
                 scriptingComponentHelper.createResources();

Review comment:
       This is not strictly relevant to this modification, but I found that if we have 1 flowfile in the input queue and we start the processor with e.g. 3 concurrent threads, then onTrigger will be fully executed 3 times, and only 1 of the executions will have a flowfile to process. Usually this is avoided by putting this in onTrigger:
   `FlowFile incomingFlowFile = session.get();
   if (incomingFlowFile == null) {
       return;
   }`
   However, we cannot call session.get() in onTrigger in this case since it is the script's task to get the flowfile. I did not find any means to check in the beginning of onTrigger if the incoming queue is empty to avoid fully executing it in such a case. Do you know any solution to this?

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
##########
@@ -253,31 +226,16 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
                     }
                 }
 
-                scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
-
-                // Execute any engine-specific configuration before the script is evaluated
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-
-                // Evaluate the script with the configurator (if it exists) or the engine
-                if (configurator != null) {
-                    configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                    configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                } else {
-                    scriptEngine.eval(scriptToRun);
-                }
+                scriptRunner.run(bindings);
 
                 // Commit this session for the user. This plus the outermost catch statement mimics the behavior
                 // of AbstractProcessor. This class doesn't extend AbstractProcessor in order to share a base
                 // class with InvokeScriptedProcessor
                 session.commitAsync();
+                scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);

Review comment:
       If this line is not in a finally block, then I think we might lose a ScriptRunner if something throws a non-ScriptException Throwable in this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 merged pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
markap14 merged pull request #5116:
URL: https://github.com/apache/nifi/pull/5116


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] mattyb149 commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r645877344



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
##########
@@ -253,31 +226,16 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
                     }
                 }
 
-                scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
-
-                // Execute any engine-specific configuration before the script is evaluated
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-
-                // Evaluate the script with the configurator (if it exists) or the engine
-                if (configurator != null) {
-                    configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                    configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                } else {
-                    scriptEngine.eval(scriptToRun);
-                }
+                scriptRunner.run(bindings);
 
                 // Commit this session for the user. This plus the outermost catch statement mimics the behavior
                 // of AbstractProcessor. This class doesn't extend AbstractProcessor in order to share a base
                 // class with InvokeScriptedProcessor
                 session.commitAsync();
+                scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);
             } catch (ScriptException e) {
-                // Reset the configurator on error, this can indicate to the configurator to recompile the script on next init()
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-                if (configurator != null) {
-                    configurator.reset();
-                }
+                // Create a new ScriptRunner to replace the one that caused an exception
+                scriptingComponentHelper.setupScriptRunners(1, scriptToRun, getLogger());

Review comment:
       The idea was to not offer the troublesome one back to the queue, but instead create a new one and offer that, so we'd always keep the queue full with runners that are hopefully good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #5116: NIFI-8625: Refactor scripted components to use ScriptRunner to fix concurrency issues

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #5116:
URL: https://github.com/apache/nifi/pull/5116#discussion_r646653878



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
##########
@@ -253,31 +226,16 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
                     }
                 }
 
-                scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
-
-                // Execute any engine-specific configuration before the script is evaluated
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-
-                // Evaluate the script with the configurator (if it exists) or the engine
-                if (configurator != null) {
-                    configurator.init(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                    configurator.eval(scriptEngine, scriptToRun, scriptingComponentHelper.getModules());
-                } else {
-                    scriptEngine.eval(scriptToRun);
-                }
+                scriptRunner.run(bindings);
 
                 // Commit this session for the user. This plus the outermost catch statement mimics the behavior
                 // of AbstractProcessor. This class doesn't extend AbstractProcessor in order to share a base
                 // class with InvokeScriptedProcessor
                 session.commitAsync();
+                scriptingComponentHelper.scriptRunnerQ.offer(scriptRunner);
             } catch (ScriptException e) {
-                // Reset the configurator on error, this can indicate to the configurator to recompile the script on next init()
-                ScriptEngineConfigurator configurator =
-                        scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
-                if (configurator != null) {
-                    configurator.reset();
-                }
+                // Create a new ScriptRunner to replace the one that caused an exception
+                scriptingComponentHelper.setupScriptRunners(1, scriptToRun, getLogger());

Review comment:
       That makes perfect sense. However, ExecuteScript has 1 instance of ScriptingComponentHelper. If the ExecuteScript processor is set to use 3 concurrent tasks, then 3 ScriptRunner instances get stored in the scriptingComponentHelper.scriptRunnerQ queue. If one ScriptRunner causes an exception, the scriptingComponentHelper.scriptRunnerQ queue gets replaced with a new queue with 1 element in it. Or am I missing something here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org