You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/06/10 14:55:08 UTC

[GitHub] [nifi] markap14 commented on a change in pull request #4948: NIFI-8273 Adding Scripted Record processors

markap14 commented on a change in pull request #4948:
URL: https://github.com/apache/nifi/pull/4948#discussion_r648689913



##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedPartitionRecord.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+@Tags({"record", "partition", "script", "groovy", "jython", "python", "segment", "split", "group", "organize"})
+@CapabilityDescription(
+        "This processor provides the ability to partition and route the records of the incoming FlowFile using an user-provided script. " +
+        "The script is expected to handle a record as argument and return with a string value. " +
+        "The returned value defines which partition the given record should end up. " +
+        "Partitions are defined as dynamic properties: dynamic property names are serving as partitions known by the processor. " +

Review comment:
       I think it's important here to avoid conflating the ideas of 'partitioning' and 'routing'. I think this should operate in much the same way as PartitionRecord - 3 relationships: success, failure, original. An attribute would be added indicating the partition and all FlowFiles would be routed to success. If routing is desirable, it can be done via RouteOnAttribute. I would avoid any dynamic properties at all.
   
   This results in a much simpler processor (code-wise) and a cleaner, simpler, more consistent user experience.

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouterProcessor.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.script.ScriptingComponentUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ListRecordSet;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.")
+})
+public abstract class ScriptedRouterProcessor<T> extends ScriptedProcessor {
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use parsing the incoming FlowFile into Records")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use for serializing Records after they have been transformed")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder()
+            .name("Script Engine")
+            .displayName("Script Language")
+            .description("The Language to use for the script")
+            .allowableValues(SCRIPT_OPTIONS)
+            .defaultValue("Groovy")
+            .required(true)
+            .build();
+
+    private static final List<PropertyDescriptor> DESCRIPTORS = Arrays.asList(
+            RECORD_READER,
+            RECORD_WRITER,
+            LANGUAGE,
+            ScriptingComponentUtils.SCRIPT_BODY,
+            ScriptingComponentUtils.SCRIPT_FILE,
+            ScriptingComponentUtils.MODULES);
+
+    private final Class<T> scriptResultType;
+
+    /**
+     * @param scriptResultType Defines the expected result type of the user-provided script.
+     */
+    protected ScriptedRouterProcessor(final Class<T> scriptResultType) {
+        this.scriptResultType = scriptResultType;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptEngine scriptEngine = pollScriptEngine();
+        if (scriptEngine == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, getFailedRelationship());
+                return;
+            }
+
+            success = route(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptEngine(scriptEngine);
+        }
+
+        session.transfer(flowFile, success ? getOriginalRelationship() : getFailedRelationship());
+    }
+
+    private boolean route(
+        final ProcessContext context,
+        final ProcessSession session,
+        final FlowFile incomingFlowFile,
+        final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new PushBackRecordSet(recordSet);
+                        final Map<Relationship, List<Record>> outgoingRecords = new HashMap<>();
+                        int index = 0;
+
+                        // Reading in records and evaluate script
+                        while (pushBackSet.isAnotherRecord()) {
+                            final Record record = pushBackSet.next();
+                            final Object evaluatedValue = evaluator.evaluate(record, index++);
+
+                            if (evaluatedValue != null && scriptResultType.isInstance(evaluatedValue)) {
+                                final Optional<Relationship> outgoingRelationship = resolveRelationship(scriptResultType.cast(evaluatedValue));
+
+                                if (outgoingRelationship.isPresent()) {
+                                    if (!outgoingRecords.containsKey(outgoingRelationship.get())) {
+                                        outgoingRecords.put(outgoingRelationship.get(), new LinkedList<>());
+                                    }
+
+                                    outgoingRecords.get(outgoingRelationship.get()).add(record);
+                                } else {
+                                    getLogger().debug("Record with evaluated value {} has no outgoing relationship determined", String.valueOf(evaluatedValue));
+                                }
+                            } else {
+                                throw new ProcessException("Script result is not applicable: " + String.valueOf(evaluatedValue));

Review comment:
       It's probably not going to be very clear to the user what this statement means: "is not applicable." Perhaps it makes more sense to use a message such as: "Script returned a value of " + evaluatedValue + " but this Processor requires that the object returned by an instance of " + getScriptReturnType()

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestScriptedPartitionRecord.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class TestScriptedPartitionRecord extends TestScriptedRouterProcessor {
+    private static final String SCRIPT =

Review comment:
       Is best to externalize this into a file in `src/test/resources`

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestScriptedRouterProcessor.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.script.ScriptingComponentUtils;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.UUID;
+
+abstract class TestScriptedRouterProcessor {
+    private static final String HEADER = "headerĀ§";
+
+    protected TestRunner testRunner;
+    protected MockRecordParser recordReader;
+    protected MockRecordWriter recordWriter;
+    protected String incomingFlowFileContent;
+
+    @Before
+    public void setUp() throws Exception {
+        testRunner = TestRunners.newTestRunner(givenProcessorType());
+
+        testRunner.setProperty(ScriptedTransformRecord.RECORD_READER, "record-reader");
+        testRunner.setProperty(ScriptedTransformRecord.RECORD_WRITER, "record-writer");
+
+        recordReader = new MockRecordParser();
+        recordReader.addSchemaField("first", RecordFieldType.INT);
+        recordReader.addSchemaField("second", RecordFieldType.STRING);
+
+        recordWriter = new MockRecordWriter(HEADER);
+
+        testRunner.addControllerService("record-reader", recordReader);
+        testRunner.addControllerService("record-writer", recordWriter);
+
+        testRunner.setProperty(ScriptingComponentUtils.SCRIPT_BODY, getScript());
+        testRunner.setProperty(ScriptedTransformRecord.LANGUAGE, "Groovy");
+
+        testRunner.enableControllerService(recordReader);
+        testRunner.enableControllerService(recordWriter);
+
+        incomingFlowFileContent = UUID.randomUUID().toString();
+    }
+
+    protected void whenTriggerProcessor() {
+        testRunner.enqueue(incomingFlowFileContent);
+        testRunner.run();
+    }
+
+    protected void thenIncomingFlowFileIsRoutedToOriginal() {
+        testRunner.assertTransferCount(getOriginalRelationship(), 1);
+        testRunner.assertTransferCount(getFailedRelationship(), 0);
+        Assert.assertEquals(incomingFlowFileContent, testRunner.getFlowFilesForRelationship(getOriginalRelationship()).get(0).getContent());
+    }
+
+    protected void thenIncomingFlowFileIsRoutedToFailed() {
+        testRunner.assertTransferCount(getOriginalRelationship(), 0);
+        testRunner.assertTransferCount(getFailedRelationship(), 1);
+        Assert.assertEquals(incomingFlowFileContent, testRunner.getFlowFilesForRelationship(getFailedRelationship()).get(0).getContent());
+    }
+
+    /**
+     * Generates the expected flow file content based on the records. Results the same format as the {@code MockRecordWriter} uses.
+     */
+    protected String givenExpectedFlowFile(final Object[][] records) {

Review comment:
       Would consider using Object[]... as the arg type. Makes for cleaner usage.
   Would also name this method something like `buildFlowFileContent` - `givenExpectedFlowFile` says nothing about what the method does.

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedValidateRecord.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+@Tags({"record", "validate", "script", "groovy", "jython", "python"})
+@CapabilityDescription(
+        "This processor provides the ability to validate records in FlowFiles using the user-provided script. " +
+        "The script is expected to have a record as incoming argument and return with a boolean value. " +
+        "Based on this result, the processor categorizes the records as \"valid\" or \"invalid\" and routes them to the respected relationship in batch. " +

Review comment:
       I think you meant 'respective' relationship instead of 'respected'

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouterProcessor.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.script.ScriptingComponentUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ListRecordSet;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.")
+})
+public abstract class ScriptedRouterProcessor<T> extends ScriptedProcessor {
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use parsing the incoming FlowFile into Records")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use for serializing Records after they have been transformed")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder()
+            .name("Script Engine")
+            .displayName("Script Language")
+            .description("The Language to use for the script")
+            .allowableValues(SCRIPT_OPTIONS)
+            .defaultValue("Groovy")
+            .required(true)
+            .build();
+
+    private static final List<PropertyDescriptor> DESCRIPTORS = Arrays.asList(
+            RECORD_READER,
+            RECORD_WRITER,
+            LANGUAGE,
+            ScriptingComponentUtils.SCRIPT_BODY,
+            ScriptingComponentUtils.SCRIPT_FILE,
+            ScriptingComponentUtils.MODULES);
+
+    private final Class<T> scriptResultType;
+
+    /**
+     * @param scriptResultType Defines the expected result type of the user-provided script.
+     */
+    protected ScriptedRouterProcessor(final Class<T> scriptResultType) {
+        this.scriptResultType = scriptResultType;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptEngine scriptEngine = pollScriptEngine();
+        if (scriptEngine == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, getFailedRelationship());
+                return;
+            }
+
+            success = route(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptEngine(scriptEngine);
+        }
+
+        session.transfer(flowFile, success ? getOriginalRelationship() : getFailedRelationship());
+    }
+
+    private boolean route(
+        final ProcessContext context,
+        final ProcessSession session,
+        final FlowFile incomingFlowFile,
+        final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new PushBackRecordSet(recordSet);
+                        final Map<Relationship, List<Record>> outgoingRecords = new HashMap<>();

Review comment:
       One of the main benefits of record-oriented processors is that they allow users to send in huge flowfiles containing thousands or millions of Records. We should never have a processor that buffers Records (with the exception of buffering up some limited number in order to push a batch to an external system, for example).
   
   Rather than keeping a `Map<Relationship, List<Record>>` we should be holding a `Map<Relationship, RecordSetWriter>`. Then, for each Record, it should be written to the appropriate RecordSetWriter and not held in memory.

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedValidateRecord.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+@Tags({"record", "validate", "script", "groovy", "jython", "python"})
+@CapabilityDescription(
+        "This processor provides the ability to validate records in FlowFiles using the user-provided script. " +
+        "The script is expected to have a record as incoming argument and return with a boolean value. " +
+        "Based on this result, the processor categorizes the records as \"valid\" or \"invalid\" and routes them to the respected relationship in batch. " +
+        "Additionally the original FlowFile will be routed to the \"original\" relationship or in case of unsuccessful processing, to the \"failed\" relationship."
+)
+@SeeAlso(classNames = {
+        "org.apache.nifi.processors.script.ScriptedTransformRecord",
+        "org.apache.nifi.processors.script.ScriptedFilterRecord",
+        "org.apache.nifi.processors.script.ScriptedPartitionRecord"
+})
+public class ScriptedValidateRecord extends ScriptedRouterProcessor<Boolean> {
+
+    static final Relationship RELATIONSHIP_VALID = new Relationship.Builder()
+            .name("valid")
+            .description(
+                "FlowFile containing the valid records from the incoming FlowFile will be routed to this relationship. " +
+                "If there are no valid records, no FlowFile will be routed to this Relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_INVALID = new Relationship.Builder()
+            .name("invalid")
+            .description(
+                "FlowFile containing the invalid records from the incoming FlowFile will be routed to this relationship. " +
+                "If there are no invalid records, no FlowFile will be routed to this Relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description(
+                "After successful procession, the incoming FlowFile will be transferred to this relationship. " +
+                "This happens regardless the FlowFiles might routed to \"valid\" and \"invalid\" relationships.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILED = new Relationship.Builder()
+            .name("failed")

Review comment:
       The typical naming convention is to use 'failure' rather than 'failed'

##########
File path: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ScriptedRouterProcessor.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.script.ScriptingComponentUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ListRecordSet;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@EventDriven
+@SupportsBatching
+@SideEffectFree
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Restricted(restrictions = {
+        @Restriction(requiredPermission = RequiredPermission.EXECUTE_CODE,
+                explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.")
+})
+public abstract class ScriptedRouterProcessor<T> extends ScriptedProcessor {
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use parsing the incoming FlowFile into Records")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use for serializing Records after they have been transformed")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    static final PropertyDescriptor LANGUAGE = new PropertyDescriptor.Builder()
+            .name("Script Engine")
+            .displayName("Script Language")
+            .description("The Language to use for the script")
+            .allowableValues(SCRIPT_OPTIONS)
+            .defaultValue("Groovy")
+            .required(true)
+            .build();
+
+    private static final List<PropertyDescriptor> DESCRIPTORS = Arrays.asList(
+            RECORD_READER,
+            RECORD_WRITER,
+            LANGUAGE,
+            ScriptingComponentUtils.SCRIPT_BODY,
+            ScriptingComponentUtils.SCRIPT_FILE,
+            ScriptingComponentUtils.MODULES);
+
+    private final Class<T> scriptResultType;
+
+    /**
+     * @param scriptResultType Defines the expected result type of the user-provided script.
+     */
+    protected ScriptedRouterProcessor(final Class<T> scriptResultType) {
+        this.scriptResultType = scriptResultType;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ScriptEngine scriptEngine = pollScriptEngine();
+        if (scriptEngine == null) {
+            // This shouldn't happen. But just in case.
+            session.rollback();
+            return;
+        }
+
+        boolean success = false;
+
+        try {
+            final ScriptEvaluator evaluator;
+
+            try {
+                evaluator = createEvaluator(scriptEngine, flowFile);
+            } catch (final ScriptException se) {
+                getLogger().error("Failed to initialize script engine", se);
+                session.transfer(flowFile, getFailedRelationship());
+                return;
+            }
+
+            success = route(context, session, flowFile, evaluator);
+        } finally {
+            offerScriptEngine(scriptEngine);
+        }
+
+        session.transfer(flowFile, success ? getOriginalRelationship() : getFailedRelationship());
+    }
+
+    private boolean route(
+        final ProcessContext context,
+        final ProcessSession session,
+        final FlowFile incomingFlowFile,
+        final ScriptEvaluator evaluator
+    ) {
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final Map<String, String> originalAttributes = incomingFlowFile.getAttributes();
+
+        try {
+            session.read(incomingFlowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (
+                        final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, incomingFlowFile.getSize(), getLogger())
+                    ) {
+                        final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushBackSet = new PushBackRecordSet(recordSet);
+                        final Map<Relationship, List<Record>> outgoingRecords = new HashMap<>();
+                        int index = 0;
+
+                        // Reading in records and evaluate script
+                        while (pushBackSet.isAnotherRecord()) {
+                            final Record record = pushBackSet.next();
+                            final Object evaluatedValue = evaluator.evaluate(record, index++);

Review comment:
       Probably worth logging at a debug level here something to the effect of:
   `getLogger().debug("Evaluated scripted against {} (index {}), producing result of {}", record, index - 1, evaluatedValue);`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org