You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/07/01 17:10:50 UTC

[GitHub] [nifi] mattyb149 commented on a change in pull request #4374: NIFI-7572: Added ScriptedTransformRecord processor. Addressed a coupl…

mattyb149 commented on a change in pull request #4374:
URL: https://github.com/apache/nifi/pull/4374#discussion_r448497317



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.script;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.script.ScriptingComponentHelper;
+import org.apache.nifi.script.ScriptingComponentUtils;
+import org.apache.nifi.search.SearchContext;
+import org.apache.nifi.search.SearchResult;
+import org.apache.nifi.search.Searchable;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.python.jsr223.PyScriptEngine;
+
+import javax.script.Bindings;
+import javax.script.CompiledScript;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"record", "transform", "script", "groovy", "jython", "python", "update", "modify", "filter"})
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
+    @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile"),
+    @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.")
+})
+@CapabilityDescription("Provides the ability to evaluate a simple script against each record in an incoming FlowFile. The script may transform the record in some way, filter the record, or fork " +
+    "additional records. See Processor's Additional Details for more information.")
+@SeeAlso(classNames = {"org.apache.nifi.processors.script.ExecuteScript",
+    "org.apache.nifi.processors.standard.UpdateRecord",
+    "org.apache.nifi.processors.standard.QueryRecord",
+    "org.apache.nifi.processors.standard.JoltTransformRecord",
+    "org.apache.nifi.processors.standard.LookupRecord"})
+public class ScriptedTransformRecord extends AbstractProcessor implements Searchable {

Review comment:
       As a scripted component it should have an `@Restricted` annotation for `EXECUTE_CODE` permissions

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/docs/org.apache.nifi.processors.script.ScriptedTransformRecord/additionalDetails.html
##########
@@ -0,0 +1,412 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+<head>
+    <meta charset="utf-8"/>
+    <title>ScriptedTransformRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+h2 {margin-top: 4em}
+h3 {margin-top: 3em}
+td {text-align: left}
+    </style>
+</head>
+
+<body>
+
+<h1>ScriptedTransformRecord</h1>
+
+<h3>Description</h3>
+
+<p>
+    The ScriptedTransformRecord provides the ability to use a scripting language, such as Groovy or Jython, to quickly and easily update the contents of a Record.
+    NiFi provides several different Processors that can be used to manipulate Records in different ways. Each of these processors has its pros and cons. The ScriptedTransformRecord is
+    the most powerful and most versatile option. However, it is also the most error-prone, as it depends on writing custom scripts. It is also likely to yield the lowest performance,
+    as processors and libraries written directly in Java are likely to perform better than interpreted scripts.
+</p>
+
+<p>
+    When creating a script, it is important to note that, unlike ExecuteScript, this Processor does not allow the script itself to expose Properties to be configured or define Relationships.
+    This is a deliberate decision. If it is necessary to expose such configuration, the ExecuteScript processor should be used instead. By not exposing these elements,
+    the script avoids the need to define a Class or implement methods with a specific method signature. Instead, the script can avoid any boilerplate code and focus purely on the task
+    at hand.
+</p>
+
+<p>
+    The provided script is evaluated once for each Record that is encountered in the incoming FlowFile. Each time that the script is invoked, it is expected to return a Record object
+    (See note below regarding <a href="#ReturnValue">Return Values</a>).
+    That Record is then written using the configured Record Writer. If the script returns a <code>null</code> value, the Record will not be written. If the script returns an object that is not
+    a Record, the incoming FlowFile will be routed to the <code>failure</code> relationship.
+</p>
+
+<p>
+    This processor maintains two Counters: "Records Transformed" indicating the number of Records that were passed to the script and for which the script returned a Record, and "Records Dropped"
+    indicating the number of Records that were passed to the script and for which the script returned a value of <code>null</code>.
+</p>
+
+
+
+<h3>Variable Bindings</h3>
+
+<p>
+    While the script provided to this Processor does not need to provide boilerplate code or implement any classes/interfaces, it does need some way to access the Records and other information
+    that it needs in order to perform its task. This is accomplished by using Variable Bindings. Each time that the script is invoked, each of the following variables will be made
+    available to the script:
+</p>
+
+<table>
+    <tr>
+        <th>Variable Name</th>
+        <th>Description</th>
+        <th>Variable Class</th>
+    </tr>
+    <tr>
+        <td>record</td>
+        <td>The Record that is to be transformed.</td>
+        <td><a href="https://javadoc.io/static/org.apache.nifi/nifi-record/1.11.4/org/apache/nifi/serialization/record/Record.html">Record</a></td>
+    </tr>
+    <tr>
+        <td>recordIndex</td>
+        <td>The zero-based index of the Record in the FlowFile.</td>
+        <td>Long (64-bit signed integer)</td>
+    </tr>
+    <tr>
+        <td>log</td>
+        <td>The Processor's Logger. Anything that is logged to this logger will be written to the logs as if the Processor itself had logged it. Additionally, a bulletin will be created for any
+            log message written to this logger (though by default, the Processor will hide any bulletins with a level below WARN).</td>
+        <td><a href="https://www.javadoc.io/doc/org.apache.nifi/nifi-api/latest/org/apache/nifi/logging/ComponentLog.html">ComponentLog</a></td>
+    </tr>
+    <tr>
+        <td>attributes</td>
+        <td>Map of key/value pairs that are the Attributes of the FlowFile. Both the keys and the values of this Map are of type String. This Map is immutable.
+            Any attempt to modify it will result in an UnsupportedOperationException being thrown.</td>
+        <td>java.util.Map</td>
+    </tr>
+</table>
+
+
+<a name="ReturnValue"></a>
+<h3>Return Value</h3>
+
+<p>
+    Each time that the script is invoked, it is expected to return a
+    <a href="https://javadoc.io/static/org.apache.nifi/nifi-record/1.11.4/org/apache/nifi/serialization/record/Record.html">Record</a> object.
+    That Record is then written using the configured Record Writer. If the script returns a <code>null</code> value, the Record will not be written. If the script returns an object that is not

Review comment:
       Should add a sentence here describing how a Collection of Records can be returned and all will be written out.

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedTransformRecord.java
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.script;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.script.ScriptingComponentHelper;
+import org.apache.nifi.script.ScriptingComponentUtils;
+import org.apache.nifi.search.SearchContext;
+import org.apache.nifi.search.SearchResult;
+import org.apache.nifi.search.Searchable;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.python.jsr223.PyScriptEngine;
+
+import javax.script.Bindings;
+import javax.script.CompiledScript;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"record", "transform", "script", "groovy", "jython", "python", "update", "modify", "filter"})
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
+    @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile"),
+    @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.")
+})
+@CapabilityDescription("Provides the ability to evaluate a simple script against each record in an incoming FlowFile. The script may transform the record in some way, filter the record, or fork " +
+    "additional records. See Processor's Additional Details for more information.")
+@SeeAlso(classNames = {"org.apache.nifi.processors.script.ExecuteScript",
+    "org.apache.nifi.processors.standard.UpdateRecord",
+    "org.apache.nifi.processors.standard.QueryRecord",
+    "org.apache.nifi.processors.standard.JoltTransformRecord",
+    "org.apache.nifi.processors.standard.LookupRecord"})
+public class ScriptedTransformRecord extends AbstractProcessor implements Searchable {
+    private static final String PYTHON_SCRIPT_LANGUAGE = "python";
+    private static final Set<String> SCRIPT_OPTIONS = ScriptingComponentUtils.getAvailableEngines();
+
+    static final PropertyDescriptor RECORD_READER = new Builder()
+        .name("Record Reader")
+        .displayName("Record Reader")
+        .description("The Record Reader to use parsing the incoming FlowFile into Records")
+        .required(true)
+        .identifiesControllerService(RecordReaderFactory.class)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new Builder()
+        .name("Record Writer")
+        .displayName("Record Writer")
+        .description("The Record Writer to use for serializing Records after they have been transformed")
+        .required(true)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+    static final PropertyDescriptor LANGUAGE = new Builder()
+        .name("Script Engine")
+        .displayName("Script Language")
+        .description("The Language to use for the script")
+        .allowableValues(SCRIPT_OPTIONS)
+        .defaultValue("Groovy")
+        .required(true)
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("Each FlowFile that were successfully transformed will be routed to this Relationship")
+        .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("Any FlowFile that cannot be transformed will be routed to this Relationship")
+        .build();
+
+
+    private volatile String scriptToRun = null;
+    private volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();

Review comment:
       Nitpick but this can be final :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org