You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/04/25 17:59:12 UTC

nifi git commit: NIFI-3734: Add ScriptedReader and ScriptedRecordSetWriter

Repository: nifi
Updated Branches:
  refs/heads/master ee4b88620 -> 49a62448c


NIFI-3734: Add ScriptedReader and ScriptedRecordSetWriter

This closes #1691.

Signed-off-by: Andy LoPresto <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/49a62448
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/49a62448
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/49a62448

Branch: refs/heads/master
Commit: 49a62448ce397aa7719b556c26742b8193be281e
Parents: ee4b886
Author: Matt Burgess <ma...@apache.org>
Authored: Tue Apr 25 00:38:14 2017 -0400
Committer: Andy LoPresto <al...@apache.org>
Committed: Tue Apr 25 10:57:21 2017 -0700

----------------------------------------------------------------------
 .../nifi-scripting-processors/pom.xml           |   4 +
 .../script/AbstractScriptedRecordFactory.java   | 228 +++++++++++++++++++
 .../nifi/record/script/ScriptedReader.java      | 145 ++++++++++++
 .../record/script/ScriptedRecordSetWriter.java  | 145 ++++++++++++
 ...org.apache.nifi.controller.ControllerService |  17 ++
 .../record/script/ScriptedReaderTest.groovy     | 202 ++++++++++++++++
 .../script/ScriptedRecordSetWriterTest.groovy   | 137 +++++++++++
 .../groovy/test_record_reader_inline.groovy     |  66 ++++++
 .../groovy/test_record_reader_xml.groovy        |  86 +++++++
 .../groovy/test_record_writer_inline.groovy     |  81 +++++++
 10 files changed, 1111 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/49a62448/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
index e487370..fc01a18 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
@@ -39,6 +39,10 @@
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-all</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a62448/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java
new file mode 100644
index 0000000..9b70fe7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.script;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.script.ScriptingComponentHelper;
+import org.apache.nifi.processors.script.ScriptingComponentUtils;
+import org.apache.nifi.util.StringUtils;
+
+import javax.script.ScriptEngine;
+import java.io.FileInputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An abstract base class containing code common to the Scripted record reader/writer implementations
+ */
+public abstract class AbstractScriptedRecordFactory<T> extends AbstractControllerService {
+
+    protected final AtomicReference<T> recordFactory = new AtomicReference<>();
+
+    protected final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>());
+
+    protected final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
+
+    protected volatile ScriptEngine scriptEngine = null;
+    protected volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
+    protected volatile ConfigurationContext configurationContext = null;
+
+    /**
+     * Returns a list of property descriptors supported by this record reader. The
+     * list always includes properties such as script engine name, script file
+     * name, script body name, script arguments, and an external module path.
+     *
+     * @return a List of PropertyDescriptor objects supported by this processor
+     */
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+
+        synchronized (scriptingComponentHelper.isInitialized) {
+            if (!scriptingComponentHelper.isInitialized.get()) {
+                scriptingComponentHelper.createResources();
+            }
+        }
+        List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>();
+        supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors());
+
+        return Collections.unmodifiableList(supportedPropertyDescriptors);
+    }
+
+    /**
+     * Returns a PropertyDescriptor for the given name. This is for the user to
+     * be able to define their own properties which will be available as
+     * variables in the script
+     *
+     * @param propertyDescriptorName used to lookup if any property descriptors
+     *                               exist for that name
+     * @return a PropertyDescriptor object corresponding to the specified
+     * dynamic property name
+     */
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(true)
+                .dynamic(true)
+                .build();
+    }
+
+    /**
+     * Handles changes to this processor's properties. If changes are made to
+     * script- or engine-related properties, the script will be reloaded.
+     *
+     * @param descriptor of the modified property
+     * @param oldValue   non-null property value (previous)
+     * @param newValue   the new property value or if null indicates the property
+     */
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+
+        if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)
+                || ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
+                || ScriptingComponentUtils.MODULES.equals(descriptor)
+                || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
+            scriptNeedsReload.set(true);
+            // Need to reset scriptEngine if the value has changed
+            if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
+                scriptEngine = null;
+            }
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        return scriptingComponentHelper.customValidate(validationContext);
+    }
+
+    public void onEnabled(final ConfigurationContext context) {
+        this.configurationContext = context;
+
+        scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue());
+        scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue());
+        scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue());
+        String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue();
+        if (!StringUtils.isEmpty(modulePath)) {
+            scriptingComponentHelper.setModules(modulePath.split(","));
+        } else {
+            scriptingComponentHelper.setModules(new String[0]);
+        }
+        setup();
+    }
+
+    public void setup() {
+        // Create a single script engine, the Processor object is reused by each task
+        if (scriptEngine == null) {
+            scriptingComponentHelper.setup(1, getLogger());
+            scriptEngine = scriptingComponentHelper.engineQ.poll();
+        }
+
+        if (scriptEngine == null) {
+            throw new ProcessException("No script engine available!");
+        }
+
+        if (scriptNeedsReload.get() || recordFactory.get() == null) {
+            if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
+                reloadScriptFile(scriptingComponentHelper.getScriptPath());
+            } else {
+                reloadScriptBody(scriptingComponentHelper.getScriptBody());
+            }
+            scriptNeedsReload.set(false);
+        }
+    }
+
+    /**
+     * Reloads the script located at the given path
+     *
+     * @param scriptPath the path to the script file to be loaded
+     * @return true if the script was loaded successfully; false otherwise
+     */
+    private boolean reloadScriptFile(final String scriptPath) {
+        final Collection<ValidationResult> results = new HashSet<>();
+
+        try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) {
+            return reloadScript(IOUtils.toString(scriptStream, Charset.defaultCharset()));
+
+        } catch (final Exception e) {
+            final ComponentLog logger = getLogger();
+            final String message = "Unable to load script: " + e;
+
+            logger.error(message, e);
+            results.add(new ValidationResult.Builder()
+                    .subject("ScriptValidation")
+                    .valid(false)
+                    .explanation("Unable to load script due to " + e)
+                    .input(scriptPath)
+                    .build());
+        }
+
+        // store the updated validation results
+        validationResults.set(results);
+
+        // return whether there was any issues loading the configured script
+        return results.isEmpty();
+    }
+
+    /**
+     * Reloads the script defined by the given string
+     *
+     * @param scriptBody the contents of the script to be loaded
+     * @return true if the script was loaded successfully; false otherwise
+     */
+    private boolean reloadScriptBody(final String scriptBody) {
+        final Collection<ValidationResult> results = new HashSet<>();
+        try {
+            return reloadScript(scriptBody);
+
+        } catch (final Exception e) {
+            final ComponentLog logger = getLogger();
+            final String message = "Unable to load script: " + e;
+
+            logger.error(message, e);
+            results.add(new ValidationResult.Builder()
+                    .subject("ScriptValidation")
+                    .valid(false)
+                    .explanation("Unable to load script due to " + e)
+                    .input(scriptingComponentHelper.getScriptPath())
+                    .build());
+        }
+
+        // store the updated validation results
+        validationResults.set(results);
+
+        // return whether there was any issues loading the configured script
+        return results.isEmpty();
+    }
+
+    protected abstract boolean reloadScript(final String scriptBody);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a62448/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java
new file mode 100644
index 0000000..d95c87f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.script;
+
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.script.ScriptEngineConfigurator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import javax.script.Invocable;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.Collection;
+import java.util.HashSet;
+
+/**
+ * A RecordReader implementation that allows the user to script the RecordReader instance
+ */
+@Tags({"record", "recordFactory", "script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj", "restricted"})
+@CapabilityDescription("Allows the user to provide a scripted RecordReaderFactory instance in order to read/parse/generate records from an incoming flow file.")
+@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
+public class ScriptedReader extends AbstractScriptedRecordFactory<RecordReaderFactory> implements RecordReaderFactory {
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        super.onEnabled(context);
+    }
+
+    @Override
+    public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
+        if (recordFactory.get() != null) {
+            try {
+                return recordFactory.get().createRecordReader(flowFile, in, logger);
+            } catch (UndeclaredThrowableException ute) {
+                throw new IOException(ute.getCause());
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Reloads the script RecordReaderFactory. This must be called within the lock.
+     *
+     * @param scriptBody An input stream associated with the script content
+     * @return Whether the script was successfully reloaded
+     */
+    protected boolean reloadScript(final String scriptBody) {
+        // note we are starting here with a fresh listing of validation
+        // results since we are (re)loading a new/updated script. any
+        // existing validation results are not relevant
+        final Collection<ValidationResult> results = new HashSet<>();
+
+        try {
+            // get the engine and ensure its invocable
+            if (scriptEngine instanceof Invocable) {
+                final Invocable invocable = (Invocable) scriptEngine;
+
+                // Find a custom configurator and invoke their eval() method
+                ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
+                if (configurator != null) {
+                    configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
+                } else {
+                    // evaluate the script
+                    scriptEngine.eval(scriptBody);
+                }
+
+                // get configured processor from the script (if it exists)
+                final Object obj = scriptEngine.get("reader");
+                if (obj != null) {
+                    final ComponentLog logger = getLogger();
+
+                    try {
+                        // set the logger if the processor wants it
+                        invocable.invokeMethod(obj, "setLogger", logger);
+                    } catch (final NoSuchMethodException nsme) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Configured script RecordReaderFactory does not contain a setLogger method.");
+                        }
+                    }
+
+                    if (configurationContext != null) {
+                        try {
+                            // set the logger if the processor wants it
+                            invocable.invokeMethod(obj, "setConfigurationContext", configurationContext);
+                        } catch (final NoSuchMethodException nsme) {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Configured script RecordReaderFactory does not contain a setConfigurationContext method.");
+                            }
+                        }
+                    }
+
+                    // record the processor for use later
+                    final RecordReaderFactory scriptedReader = invocable.getInterface(obj, RecordReaderFactory.class);
+                    recordFactory.set(scriptedReader);
+
+                } else {
+                    throw new ScriptException("No RecordReader was defined by the script.");
+                }
+            }
+
+        } catch (final Exception ex) {
+            final ComponentLog logger = getLogger();
+            final String message = "Unable to load script: " + ex.getLocalizedMessage();
+
+            logger.error(message, ex);
+            results.add(new ValidationResult.Builder()
+                    .subject("ScriptValidation")
+                    .valid(false)
+                    .explanation("Unable to load script due to " + ex.getLocalizedMessage())
+                    .input(scriptingComponentHelper.getScriptPath())
+                    .build());
+        }
+
+        // store the updated validation results
+        validationResults.set(results);
+
+        // return whether there was any issues loading the configured script
+        return results.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a62448/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
new file mode 100644
index 0000000..dd0be2b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.script;
+
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.script.ScriptEngineConfigurator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import javax.script.Invocable;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.Collection;
+import java.util.HashSet;
+
+/**
+ * A RecordSetWriter implementation that allows the user to script the RecordWriter instance
+ */
+@Tags({"record", "writer", "script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj", "restricted"})
+@CapabilityDescription("Allows the user to provide a scripted RecordSetWriterFactory instance in order to write records to an outgoing flow file.")
+@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
+public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<RecordSetWriterFactory> implements RecordSetWriterFactory {
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        super.onEnabled(context);
+    }
+
+    @Override
+    public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream flowFileContent) throws SchemaNotFoundException, IOException {
+        if (recordFactory.get() != null) {
+            try {
+                return recordFactory.get().createWriter(logger, flowFile, flowFileContent);
+            } catch (UndeclaredThrowableException ute) {
+                throw new IOException(ute.getCause());
+            }
+        }
+        return null;
+    }
+
+
+    /**
+     * Reloads the script RecordSetWriterFactory. This must be called within the lock.
+     *
+     * @param scriptBody An input stream associated with the script content
+     * @return Whether the script was successfully reloaded
+     */
+    protected boolean reloadScript(final String scriptBody) {
+        // note we are starting here with a fresh listing of validation
+        // results since we are (re)loading a new/updated script. any
+        // existing validation results are not relevant
+        final Collection<ValidationResult> results = new HashSet<>();
+
+        try {
+            // get the engine and ensure its invocable
+            if (scriptEngine instanceof Invocable) {
+                final Invocable invocable = (Invocable) scriptEngine;
+
+                // Find a custom configurator and invoke their eval() method
+                ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
+                if (configurator != null) {
+                    configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
+                } else {
+                    // evaluate the script
+                    scriptEngine.eval(scriptBody);
+                }
+
+                // get configured processor from the script (if it exists)
+                final Object obj = scriptEngine.get("writer");
+                if (obj != null) {
+                    final ComponentLog logger = getLogger();
+
+                    try {
+                        // set the logger if the processor wants it
+                        invocable.invokeMethod(obj, "setLogger", logger);
+                    } catch (final NoSuchMethodException nsme) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Configured script RecordSetWriterFactory does not contain a setLogger method.");
+                        }
+                    }
+
+                    if (configurationContext != null) {
+                        try {
+                            // set the logger if the processor wants it
+                            invocable.invokeMethod(obj, "setConfigurationContext", configurationContext);
+                        } catch (final NoSuchMethodException nsme) {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Configured script RecordSetWriterFactory does not contain a setConfigurationContext method.");
+                            }
+                        }
+                    }
+
+                    // record the processor for use later
+                    final RecordSetWriterFactory scriptedWriter = invocable.getInterface(obj, RecordSetWriterFactory.class);
+                    recordFactory.set(scriptedWriter);
+
+                } else {
+                    throw new ScriptException("No RecordSetWriterFactory was defined by the script.");
+                }
+            }
+
+        } catch (final Exception ex) {
+            final ComponentLog logger = getLogger();
+            final String message = "Unable to load script: " + ex.getLocalizedMessage();
+
+            logger.error(message, ex);
+            results.add(new ValidationResult.Builder()
+                    .subject("ScriptValidation")
+                    .valid(false)
+                    .explanation("Unable to load script due to " + ex.getLocalizedMessage())
+                    .input(scriptingComponentHelper.getScriptPath())
+                    .build());
+        }
+
+        // store the updated validation results
+        validationResults.set(results);
+
+        // return whether there was any issues loading the configured script
+        return results.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a62448/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..f698255
--- /dev/null
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.record.script.ScriptedReader
+org.apache.nifi.record.script.ScriptedRecordSetWriter
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a62448/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy
new file mode 100644
index 0000000..1025146
--- /dev/null
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.script
+
+import org.apache.commons.io.FileUtils
+import org.apache.nifi.components.PropertyDescriptor
+import org.apache.nifi.controller.ConfigurationContext
+import org.apache.nifi.controller.ControllerServiceInitializationContext
+import org.apache.nifi.logging.ComponentLog
+import org.apache.nifi.processor.util.StandardValidators
+import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper
+import org.apache.nifi.processors.script.ScriptingComponentHelper
+import org.apache.nifi.processors.script.ScriptingComponentUtils
+import org.apache.nifi.serialization.RecordReader
+import org.apache.nifi.util.MockComponentLog
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.MockPropertyValue
+import org.apache.nifi.util.TestRunners
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import static groovy.util.GroovyTestCase.assertEquals
+import static org.junit.Assert.assertNotNull
+import static org.junit.Assert.assertNull
+import static org.mockito.Mockito.mock
+import static org.mockito.Mockito.when
+
+/**
+ * Unit tests for the ScriptedReader class
+ */
+@RunWith(JUnit4.class)
+class ScriptedReaderTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(ScriptedReaderTest)
+    def recordReaderFactory
+    def runner
+    def scriptingComponent
+
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        logger.metaClass.methodMissing = {String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+        FileUtils.copyDirectory('src/test/resources' as File, 'target/test/resources' as File)
+    }
+
+    @Before
+    void setUp() {
+        recordReaderFactory = new MockScriptedReader()
+        runner = TestRunners
+        scriptingComponent = (AccessibleScriptingComponentHelper) recordReaderFactory
+    }
+
+    @Test
+    void testRecordReaderGroovyScript() {
+
+        def properties = [:] as Map<PropertyDescriptor, String>
+        recordReaderFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor ->
+            properties.put(descriptor, descriptor.getDefaultValue())
+        }
+
+        // Mock the ConfigurationContext for setup(...)
+        def configurationContext = mock(ConfigurationContext)
+        when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
+                .thenReturn(new MockPropertyValue('Groovy'))
+        when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
+                .thenReturn(new MockPropertyValue('target/test/resources/groovy/test_record_reader_inline.groovy'))
+        when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
+                .thenReturn(new MockPropertyValue(null))
+        when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
+                .thenReturn(new MockPropertyValue(null))
+
+        def logger = mock(ComponentLog)
+        def initContext = mock(ControllerServiceInitializationContext)
+        when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
+        when(initContext.getLogger()).thenReturn(logger)
+
+        recordReaderFactory.initialize initContext
+        recordReaderFactory.onEnabled configurationContext
+
+        MockFlowFile mockFlowFile = new MockFlowFile(1L)
+        InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
+
+        RecordReader recordReader = recordReaderFactory.createRecordReader(mockFlowFile, inStream, logger)
+        assertNotNull(recordReader)
+
+        3.times {
+            def record = recordReader.nextRecord()
+            assertNotNull(record)
+            assertEquals(record.getAsInt('code'), record.getAsInt('id') * 100)
+        }
+        assertNull(recordReader.nextRecord())
+    }
+
+    @Test
+    void testXmlRecordReaderGroovyScript() {
+
+        def properties = [:] as Map<PropertyDescriptor, String>
+        recordReaderFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor ->
+            properties.put(descriptor, descriptor.getDefaultValue())
+        }
+
+        // Test dynamic property descriptor
+        PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
+                .name('schema.text')
+                .dynamic(true)
+                .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+                .build()
+
+        def schemaText = '''
+                [
+                  {"id": "int"},
+                  {"name": "string"},
+                  {"code": "int"}
+                ]
+            '''
+        properties.put(SCHEMA_TEXT, schemaText)
+
+        // Mock the ConfigurationContext for setup(...)
+        def configurationContext = mock(ConfigurationContext)
+        when(configurationContext.getProperties()).thenReturn(properties)
+        when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
+                .thenReturn(new MockPropertyValue('Groovy'))
+        when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
+                .thenReturn(new MockPropertyValue('target/test/resources/groovy/test_record_reader_xml.groovy'))
+        when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
+                .thenReturn(new MockPropertyValue(null))
+        when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
+                .thenReturn(new MockPropertyValue(null))
+        when(configurationContext.getProperty(SCHEMA_TEXT)).thenReturn(new MockPropertyValue(schemaText))
+
+        def logger = new MockComponentLog('ScriptedReader', '')
+        def initContext = mock(ControllerServiceInitializationContext)
+        when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
+        when(initContext.getLogger()).thenReturn(logger)
+
+        recordReaderFactory.initialize initContext
+        recordReaderFactory.onEnabled configurationContext
+
+        MockFlowFile mockFlowFile = new MockFlowFile(1L)
+        mockFlowFile.putAttributes(['record.tag': 'myRecord'])
+
+        InputStream inStream = new ByteArrayInputStream('''
+                <root>
+                  <myRecord>
+                    <id>1</id>
+                    <name>John</name>
+                    <code>100</code>
+                  </myRecord>
+                    <myRecord>
+                    <id>2</id>
+                    <name>Mary</name>
+                    <code>200</code>
+                  </myRecord>
+                  <myRecord>
+                    <id>3</id>
+                    <name>Ramon</name>
+                    <code>300</code>
+                  </myRecord>
+                </root>
+            '''.bytes)
+
+        RecordReader recordReader = recordReaderFactory.createRecordReader(mockFlowFile, inStream, logger)
+        assertNotNull(recordReader)
+
+        3.times {
+            def record = recordReader.nextRecord()
+            assertNotNull(record)
+            assertEquals(record.getAsInt('code'), record.getAsInt('id') * 100)
+        }
+        assertNull(recordReader.nextRecord())
+
+    }
+
+    class MockScriptedReader extends ScriptedReader implements AccessibleScriptingComponentHelper {
+
+        @Override
+        ScriptingComponentHelper getScriptingComponentHelper() {
+            return this.@scriptingComponentHelper
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a62448/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
new file mode 100644
index 0000000..d4e7d5a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.script
+
+import org.apache.commons.io.FileUtils
+import org.apache.nifi.components.PropertyDescriptor
+import org.apache.nifi.controller.ConfigurationContext
+import org.apache.nifi.controller.ControllerServiceInitializationContext
+import org.apache.nifi.logging.ComponentLog
+import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper
+import org.apache.nifi.processors.script.ScriptingComponentHelper
+import org.apache.nifi.processors.script.ScriptingComponentUtils
+import org.apache.nifi.serialization.RecordSetWriter
+import org.apache.nifi.serialization.SimpleRecordSchema
+import org.apache.nifi.serialization.record.MapRecord
+import org.apache.nifi.serialization.record.RecordField
+import org.apache.nifi.serialization.record.RecordFieldType
+import org.apache.nifi.serialization.record.RecordSet
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.MockPropertyValue
+import org.apache.nifi.util.TestRunners
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import static org.junit.Assert.assertNotNull
+import static org.junit.Assert.assertEquals
+import static org.mockito.Mockito.mock
+import static org.mockito.Mockito.when
+
+/**
+ * Unit tests for the ScriptedReader class
+ */
+@RunWith(JUnit4.class)
+class ScriptedRecordSetWriterTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(ScriptedRecordSetWriterTest)
+    MockScriptedWriter recordSetWriterFactory
+    def runner
+    def scriptingComponent
+
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        logger.metaClass.methodMissing = {String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+        FileUtils.copyDirectory('src/test/resources' as File, 'target/test/resources' as File)
+    }
+
+    @Before
+    void setUp() {
+        recordSetWriterFactory = new MockScriptedWriter()
+        runner = TestRunners
+        scriptingComponent = (AccessibleScriptingComponentHelper) recordSetWriterFactory
+    }
+
+    @Test
+    void testRecordWriterGroovyScript() {
+
+        def properties = [:] as Map<PropertyDescriptor, String>
+        recordSetWriterFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor ->
+            properties.put(descriptor, descriptor.getDefaultValue())
+        }
+
+        // Mock the ConfigurationContext for setup(...)
+        def configurationContext = mock(ConfigurationContext)
+        when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
+                .thenReturn(new MockPropertyValue('Groovy'))
+        when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
+                .thenReturn(new MockPropertyValue('target/test/resources/groovy/test_record_writer_inline.groovy'))
+        when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
+                .thenReturn(new MockPropertyValue(null))
+        when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
+                .thenReturn(new MockPropertyValue(null))
+
+        def logger = mock(ComponentLog)
+        def initContext = mock(ControllerServiceInitializationContext)
+        when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
+        when(initContext.getLogger()).thenReturn(logger)
+
+        recordSetWriterFactory.initialize initContext
+        recordSetWriterFactory.onEnabled configurationContext
+
+        MockFlowFile mockFlowFile = new MockFlowFile(1L)
+        InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
+
+        RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, mockFlowFile, inStream)
+        assertNotNull(recordSetWriter)
+
+        def recordSchema = new SimpleRecordSchema(
+                [new RecordField('id', RecordFieldType.INT.dataType),
+                 new RecordField('name', RecordFieldType.STRING.dataType),
+                 new RecordField('code', RecordFieldType.INT.dataType)]
+        )
+
+        def records = [
+                new MapRecord(recordSchema, ['id': 1, 'name': 'John', 'code': 100]),
+                new MapRecord(recordSchema, ['id': 2, 'name': 'Mary', 'code': 200]),
+                new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300])
+        ] as MapRecord[]
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
+        recordSetWriter.write(RecordSet.of(recordSchema, records), outputStream)
+
+        def xml = new XmlSlurper().parseText(outputStream.toString())
+        assertEquals('1', xml.record[0].id.toString())
+        assertEquals('200', xml.record[1].code.toString())
+        assertEquals('Ramon', xml.record[2].name.toString())
+    }
+
+    class MockScriptedWriter extends ScriptedRecordSetWriter implements AccessibleScriptingComponentHelper {
+
+        @Override
+        ScriptingComponentHelper getScriptingComponentHelper() {
+            return this.@scriptingComponentHelper
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a62448/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
new file mode 100644
index 0000000..2be37df
--- /dev/null
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.logging.ComponentLog
+import org.apache.nifi.schema.access.SchemaNotFoundException
+import org.apache.nifi.serialization.MalformedRecordException
+import org.apache.nifi.serialization.RecordReader
+import org.apache.nifi.serialization.RecordReaderFactory
+import org.apache.nifi.serialization.SimpleRecordSchema
+import org.apache.nifi.serialization.record.MapRecord
+import org.apache.nifi.serialization.record.Record
+import org.apache.nifi.serialization.record.RecordField
+import org.apache.nifi.serialization.record.RecordFieldType
+import org.apache.nifi.serialization.record.RecordSchema
+
+
+class GroovyRecordReader implements RecordReader {
+
+    def recordSchema = new SimpleRecordSchema(
+            [new RecordField('id', RecordFieldType.INT.dataType),
+             new RecordField('name', RecordFieldType.STRING.dataType),
+             new RecordField('code', RecordFieldType.INT.dataType)]
+    )
+
+    def recordIterator = [
+            new MapRecord(recordSchema, ['id': 1, 'name': 'John', 'code': 100]),
+            new MapRecord(recordSchema, ['id': 2, 'name': 'Mary', 'code': 200]),
+            new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300])
+    ].iterator()
+
+    Record nextRecord() throws IOException, MalformedRecordException {
+        return recordIterator.hasNext() ? recordIterator.next() : null
+    }
+
+    RecordSchema getSchema() throws MalformedRecordException {
+        return recordSchema
+    }
+
+    void close() throws IOException {
+    }
+}
+
+class GroovyRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
+
+    RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
+        return new GroovyRecordReader()
+    }
+}
+
+reader = new GroovyRecordReaderFactory()

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a62448/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
new file mode 100644
index 0000000..d51089b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonSlurper
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.controller.ConfigurationContext
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.logging.ComponentLog
+import org.apache.nifi.schema.access.SchemaNotFoundException
+import org.apache.nifi.serialization.MalformedRecordException
+import org.apache.nifi.serialization.RecordReader
+import org.apache.nifi.serialization.RecordReaderFactory
+import org.apache.nifi.serialization.SimpleRecordSchema
+import org.apache.nifi.serialization.record.MapRecord
+import org.apache.nifi.serialization.record.Record
+import org.apache.nifi.serialization.record.RecordField
+import org.apache.nifi.serialization.record.RecordFieldType
+import org.apache.nifi.serialization.record.RecordSchema
+
+
+class GroovyXmlRecordReader implements RecordReader {
+
+    def recordIterator
+    def recordSchema
+
+    GroovyXmlRecordReader(final String recordTag, final RecordSchema schema, final InputStream inputStream) {
+        recordSchema = schema
+        def xml = new XmlSlurper().parse(inputStream)
+        // Change the XML fields to a MapRecord for each incoming record
+        recordIterator = xml[recordTag].collect {r ->
+            // Create a map of field names to values, using the field names from the schema as keys into the XML object
+            def fields = recordSchema.fieldNames.inject([:]) {result, fieldName ->
+                result[fieldName] = r[fieldName].toString()
+                result
+            }
+            new MapRecord(recordSchema, fields)
+        }.iterator()
+    }
+
+    Record nextRecord() throws IOException, MalformedRecordException {
+        return recordIterator?.hasNext() ? recordIterator.next() : null
+    }
+
+    RecordSchema getSchema() throws MalformedRecordException {
+        return recordSchema
+    }
+
+    void close() throws IOException {
+    }
+}
+
+class GroovyXmlRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
+
+    // Will be set by the ScriptedRecordReaderFactory
+    ConfigurationContext configurationContext
+
+    RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
+        // Expecting 'schema.text' to have be an JSON array full of objects whose keys are the field name and the value maps to a RecordFieldType
+        def schemaText = configurationContext.properties.find {p -> p.key.dynamic && p.key.name == 'schema.text'}?.getValue()
+        if (!schemaText) return null
+        def jsonSchema = new JsonSlurper().parseText(schemaText)
+        def recordSchema = new SimpleRecordSchema(jsonSchema.collect {field ->
+            def entry = field.entrySet()[0]
+            new RecordField(entry.key, RecordFieldType.of(entry.value).dataType)
+        } as List<RecordField>)
+        return new GroovyXmlRecordReader(flowFile.getAttribute('record.tag'), recordSchema, inputStream)
+    }
+
+}
+
+// Create an instance of RecordReaderFactory called "writer", this is the entry point for ScriptedReader
+reader = new GroovyXmlRecordReaderFactory()

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a62448/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
new file mode 100644
index 0000000..e17b701
--- /dev/null
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.xml.MarkupBuilder
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.logging.ComponentLog
+import org.apache.nifi.schema.access.SchemaNotFoundException
+import org.apache.nifi.serialization.RecordSetWriter
+import org.apache.nifi.serialization.RecordSetWriterFactory
+import org.apache.nifi.serialization.WriteResult
+import org.apache.nifi.serialization.record.Record
+import org.apache.nifi.serialization.record.RecordSet
+import org.apache.nifi.stream.io.NonCloseableOutputStream
+
+
+class GroovyRecordSetWriter implements RecordSetWriter {
+
+    @Override
+    WriteResult write(Record r, OutputStream out) throws IOException {
+        new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
+            new MarkupBuilder(osw).record {
+                r.schema.fieldNames.each {fieldName ->
+                    "$fieldName" r.getValue(fieldName)
+                }
+            }
+        }
+        WriteResult.of(0, [:])
+    }
+
+    @Override
+    String getMimeType() {
+        return 'application/xml'
+    }
+
+    @Override
+    WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException {
+        int count = 0
+
+        new OutputStreamWriter(new NonCloseableOutputStream(rawOut)).with {osw ->
+            new MarkupBuilder(osw).recordSet {
+
+                Record r
+                while (r = rs.next()) {
+                    count++
+
+                    record {
+                        rs.schema.fieldNames.each {fieldName ->
+                            "$fieldName" r.getValue(fieldName)
+                        }
+                    }
+                }
+            }
+        }
+        WriteResult.of(count, [:])
+    }
+}
+
+class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
+
+    @Override
+    RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream flowFileContent) throws SchemaNotFoundException, IOException {
+        return new GroovyRecordSetWriter()
+    }
+}
+
+writer = new GroovyRecordSetWriterFactory()