You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:55 UTC

[32/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
new file mode 100644
index 0000000..9058cf4
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.script.ScriptException;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.annotation.CapabilityDescription;
+import org.apache.nifi.processor.annotation.EventDriven;
+import org.apache.nifi.processor.annotation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.scripting.ConverterScript;
+import org.apache.nifi.scripting.ReaderScript;
+import org.apache.nifi.scripting.Script;
+import org.apache.nifi.scripting.ScriptFactory;
+import org.apache.nifi.scripting.WriterScript;
+
+/**
+ * <!-- Processor Documentation ================================================== -->
+ * <h2>Description:</h2>
+ * <p>
+ * This processor provides the capability to execute scripts in various
+ * scripting languages, and passes into the scripts the input stream and output
+ * stream(s) representing an incoming flow file and any created flow files. The
+ * processor is designed to be thread safe, so multiple concurrent tasks may
+ * execute against a single script. The processor provides a framework which
+ * enables script writers to implement 3 different types of scripts:
+ * <ul>
+ * ReaderScript - which enables stream-based reading of a FlowFile's
+ * content</br> WriterScript - which enables stream-based reading and
+ * writing/modifying of a FlowFile's content</br> ConverterScript - which
+ * enables stream-based reading a FlowFile's content and stream-based writing to
+ * newly created FlowFiles</br>
+ * </ul>
+ * Presently, the processor supports 3 scripting languages: Ruby, Python, and
+ * JavaScript. The processor is built on the javax.script API which enables
+ * ScriptEngine discovery, thread management, and encapsulates much of the low
+ * level bridging-code that enables Java to Script language integration. Thus,
+ * it is designed to be easily extended to other scripting languages. </br> The
+ * attributes of a FlowFile and properties of the Processor are exposed to the
+ * script by either a variable in the base class or a getter method. A script
+ * may declare new Processor Properties and different Relationships via
+ * overriding the getPropertyDescriptors and getRelationships methods,
+ * respectively.
+ * </p>
+ * <p>
+ * <strong>Properties:</strong>
+ * </p>
+ * <p>
+ * In the list below, the names of required properties appear in bold. Any other
+ * properties (not in bold) are considered optional. If a property has a default
+ * value, it is indicated. If a property supports the use of the NiFi Expression
+ * Language (or simply, "expression language"), that is also indicated. Of
+ * particular note: This processor allows scripts to define additional Processor
+ * properties, which will not be initially visible. Once the processor's
+ * configuration is validated, script defined properties will become visible,
+ * and may affect the validity of the processor.
+ * </p>
+ * <ul>
+ * <li>
+ * <strong>Script File Name</strong>
+ * <ul>
+ * <li>Script location, can be relative or absolute path.</li>
+ * <li>Default value: no default</li>
+ * <li>Supports expression language: false</li>
+ * </ul>
+ * </li>
+ * <li>
+ * <strong>Script Check Interval</strong>
+ * <ul>
+ * <li>The time period between checking for updates to a script.</li>
+ * <li>Default value: 15 sec</li>
+ * <li>Supports expression language: false</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <p>
+ * <strong>Relationships:</strong>
+ * </p>
+ * <p>
+ * The initial 'out of the box' relationships are below. Of particular note is
+ * the ability of a script to change the set of relationships. However, any
+ * relationships defined by the script will not be visible until the processor's
+ * configuration has been validated. Once done, new relationships will become
+ * visible.
+ * </p>
+ * <ul>
+ * <li>
+ * success
+ * <ul>
+ * <li>Used when a file is successfully processed by a script.</li>
+ * </ul>
+ * </li>
+ * <li>
+ * failure
+ * <ul>
+ * <li>Used when an error occurs while processing a file with a script.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <p>
+ * <strong>Example Scripts:</strong>
+ * </p>
+ * <ul>
+ * JavaScript example - the 'with' statement imports packages defined in the
+ * framework. Since the 'instance' variable is intended to be local scope (not
+ * global), it must be named 'instance' as it it not passed back to the
+ * processor upon script evaluation and must be fetched. If you make it global,
+ * you can name it whatever you'd like...but this is intended to be
+ * multi-threaded so do so at your own risk. Presently, there are issues with
+ * the JavaScript scripting engine that prevent sub-classing the base classes in
+ * the Processor's Java framework. So, what is actually happening is an instance
+ * of the ReaderScript is created with a provided callback object. When we are
+ * able to move to a more competent scripting engine, the code below will remain
+ * the same, but the 'instance' variable will actually be a sub-class of
+ * ReaderScript.
+ *
+ * <pre>
+ *               with (Scripting) {
+ *                 var instance = new ReaderScript({
+ *                     route : function(input) {
+ *                         var str = IOUtils.toString(input);
+ *                         var expr = instance.getProperty("expr");
+ *                         filename = instance.attributes.get("filename");
+ *                         instance.setAttribute("filename", filename + ".modified");
+ *                         if (str.match(expr)) {
+ *                             return Script.FAIL_RELATIONSHIP;
+ *                         } else {
+ *                             return Script.SUCCESS_RELATIONSHIP;
+ *                         }
+ *                     }
+ *                 });
+ *               }
+ * </pre>
+ *
+ * Ruby example - the 'OutputStreamHandler' is an interface which is called when
+ * creating flow files.
+ *
+ * <pre>
+ *                 java_import 'org.apache.nifi.scripting.OutputStreamHandler'
+ *                 class SimpleConverter < ConverterScript
+ *                   field_reader :FAIL_RELATIONSHIP, :SUCCESS_RELATIONSHIP, :logger, :attributes
+ *
+ *                   def convert(input)
+ *                     in_io = input.to_io
+ *                     createFlowFile("firstLine", FAIL_RELATIONSHIP, OutputStreamHandler.impl do |method, out|
+ *                         out_io = out.to_io
+ *                         out_io << in_io.readline.to_java_bytes
+ *                         out_io.close
+ *                         logger.debug("Wrote data to failure...this message logged with logger from super class")
+ *                       end)
+ *
+ *                     createFlowFile("otherLines", SUCCESS_RELATIONSHIP, OutputStreamHandler.impl do |method, out|
+ *                         out_io = out.to_io
+ *                         in_io.each_line { |line|
+ *                           out_io << line
+ *                         }
+ *                         out_io.close
+ *                         logger.debug("Wrote data to success...this message logged with logger from super class")
+ *                       end)
+ *                     in_io.close
+ *                   end
+ *
+ *                 end
+ *
+ *                 $logger.debug("Creating SimpleConverter...this message logged with logger from shared variables")
+ *                 SimpleConverter.new
+ * </pre>
+ *
+ * Python example - The difficulty with Python is that it does not return
+ * objects upon script evaluation, so the instance of the Script class must be
+ * fetched by name. Thus, you must define a variable called 'instance'.
+ *
+ * <pre>
+ *                 import re
+ *
+ *                 class RoutingReader(ReaderScript):
+ *                     A = Relationship.Builder().name("a").description("some good stuff").build()
+ *                     B = Relationship.Builder().name("b").description("some other stuff").build()
+ *                     C = Relationship.Builder().name("c").description("some bad stuff").build()
+ *
+ *                     def getRelationships(self):
+ *                         return [self.A,self.B,self.C]
+ *
+ *                     def getExceptionRoute(self):
+ *                         return self.C
+ *
+ *                     def route( self, input ):
+ *                         for line in FileUtil.wrap(input):
+ *                             if re.match("^bad", line, re.IGNORECASE):
+ *                                 return self.B
+ *                             if re.match("^sed", line):
+ *                                 raise RuntimeError("That's no good!")
+ *
+ *                         return self.A
+ *
+ *                 instance = RoutingReader()
+ * </pre>
+ *
+ * </ul>
+ * <p>
+ * <strong>Shared Variables</strong>
+ * </p>
+ * <ul>
+ * <li>logger : global scope</li>
+ * <li>properties : local/instance scope</li>
+ * </ul>
+ * <p>
+ * <strong>Script API:</strong>
+ * </p>
+ * <ul>
+ * <li>getAttribute(String) : String</li>
+ * <li>getAttributes() : Map(String,String)</li>
+ * <li>getExceptionRoute() : Relationship</li>
+ * <li>getFileName() : String</li>
+ * <li>getFlowFileEntryDate() : Calendar</li>
+ * <li>getFlowFileSize() : long</li>
+ * <li>getProperties() : Map(String, String)</li>
+ * <li>getProperty(String) : String</li>
+ * <li>getPropertyDescriptors() : List(PropertyDescriptor)</li>
+ * <li>getRelationships() : Collection(Relationship)</li>
+ * <li>getRoute() : Relationship</li>
+ * <li>setRoute(Relationship)</li>
+ * <li>setAttribute(String, String)</li>
+ * <li>validate() : Collection(String)</li>
+ * </ul>
+ * <p>
+ * <strong>ReaderScript API:</strong>
+ * </p>
+ * <ul>
+ * <li>route(InputStream) : Relationship</li>
+ * </ul>
+ * <p>
+ * <strong>WriterScript API:</strong>
+ * </p>
+ * <ul>
+ * <li>process(InputStream, OutputStream)</li>
+ * </ul>
+ * <p>
+ * <strong>ConverterScript API:</strong>
+ * </p>
+ * <ul>
+ * <li>convert(InputStream)</li>
+ * <li>createFlowFile(String, Relationship, OutputStreamHandler)</li>
+ * </ul>
+ * <p>
+ * <strong>OutputStreamHandler API:</strong>
+ * </p>
+ * <ul>
+ * <li>write(OutputStream)</li>
+ * </ul>
+ */
+@EventDriven
+@Tags({"script", "ruby", "python", "javascript", "execute"})
+@CapabilityDescription("Execute scripts in various scripting languages, and passes into the scripts the input stream and output stream(s) "
+        + "representing an incoming flow file and any created flow files.")
+public class ExecuteScript extends AbstractProcessor {
+
+    private final AtomicBoolean doCustomValidate = new AtomicBoolean(true);
+    private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
+    private final AtomicReference<List<PropertyDescriptor>> propertyDescriptors = new AtomicReference<>();
+    private volatile ScriptFactory scriptFactory;
+    private volatile Relationship exceptionRoute;
+
+    /**
+     * Script location, can be relative or absolute path -- passed as-is to
+     * {@link File#File(String) File constructor}
+     */
+    public static final PropertyDescriptor SCRIPT_FILE_NAME = new PropertyDescriptor.Builder()
+            .name("Script File Name")
+            .description("Script location, can be relative or absolute path")
+            .required(true)
+            .addValidator(new Validator() {
+
+                @Override
+                public ValidationResult validate(String subject, String input, ValidationContext context) {
+                    ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR.validate(subject, input, context);
+                    if (result.isValid()) {
+                        int dotPos = input.lastIndexOf('.');
+                        if (dotPos < 1) {
+                            result = new ValidationResult.Builder()
+                            .subject(subject)
+                            .valid(false)
+                            .explanation("Filename must have an extension")
+                            .input(input)
+                            .build();
+                        }
+                    }
+                    return result;
+                }
+            })
+            .build();
+
+    static final PropertyDescriptor SCRIPT_CHECK_INTERVAL = new PropertyDescriptor.Builder()
+            .name("Script Check Interval")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .description("The time period between checking for updates to a script")
+            .required(true)
+            .defaultValue("15 sec")
+            .build();
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        Set<Relationship> empty = Collections.emptySet();
+        relationships.set(empty);
+        ArrayList<PropertyDescriptor> propDescs = new ArrayList<>();
+        propDescs.add(SCRIPT_FILE_NAME);
+        propDescs.add(SCRIPT_CHECK_INTERVAL);
+        propertyDescriptors.set(Collections.unmodifiableList(propDescs));
+        scriptFactory = new ScriptFactory(getLogger());
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors.get();
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .dynamic(true)
+                .addValidator(Validator.VALID)
+                .build();
+    }
+
+    @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+        doCustomValidate.set(true);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships.get();
+    }
+
+    /**
+     * Called by framework.
+     *
+     * Returns a list of reasons why this processor cannot be run.
+     * @return 
+     */
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        if (doCustomValidate.getAndSet(false)) {
+            long interval = validationContext.getProperty(SCRIPT_CHECK_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+            scriptFactory.setScriptCheckIntervalMS(interval);
+            List<ValidationResult> results = new ArrayList<>();
+            String file = validationContext.getProperty(SCRIPT_FILE_NAME).getValue();
+            try {
+                Script s = scriptFactory.getScript(file);
+
+                // set the relationships of the processor
+                relationships.set(new HashSet<>(s.getRelationships()));
+
+                // need to get script's prop. descs. and validate. May, or may not, have dynamic
+                // props already...depends if this is the first time the processor is being configured.
+                Map<PropertyDescriptor, String> properties = validationContext.getProperties();
+
+                // need to compare props, if any, against script-expected props that are required.
+                // script may be expecting required props that are not known, or some props may have invalid
+                // values.
+                // processor may be configured with dynamic props that the script will use...but does not declare which would
+                // be a bad thing
+                List<PropertyDescriptor> scriptPropDescs = s.getPropertyDescriptors();
+                getLogger().debug("Script is {}", new Object[]{s});
+                getLogger().debug("Script file name is {}", new Object[]{s.getFileName()});
+                getLogger().debug("Script Prop Descs are: {}", new Object[]{scriptPropDescs.toString()});
+                getLogger().debug("Thread is: {}", new Object[]{Thread.currentThread().toString()});
+                for (PropertyDescriptor propDesc : scriptPropDescs) {
+                    // need to check for missing props
+                    if (propDesc.isRequired() && !properties.containsKey(propDesc)) {
+                        results.add(new ValidationResult.Builder()
+                                .subject("Script Properties")
+                                .valid(false)
+                                .explanation("Missing Property " + propDesc.getName())
+                                .build());
+
+                        // need to validate current value against script provided validator
+                    } else if (properties.containsKey(propDesc)) {
+                        String value = properties.get(propDesc);
+                        ValidationResult result = propDesc.validate(value, validationContext);
+                        if (!result.isValid()) {
+                            results.add(result);
+                        }
+                    } // else it is an optional prop according to the script and it is not specified by
+                    // the configuration of the processor
+                }
+
+                // need to update the known prop desc's with what we just got from the script
+                List<PropertyDescriptor> pds = new ArrayList<>(propertyDescriptors.get());
+                pds.addAll(scriptPropDescs);
+                propertyDescriptors.set(Collections.unmodifiableList(pds));
+
+                if (results.isEmpty()) {
+                    // so needed props are supplied and individually validated, now validate script
+                    Collection<String> reasons;
+                    reasons = s.validate();
+                    if (null == reasons) {
+                        getLogger().warn("Script had invalid return value for validate(), ignoring.");
+                    } else {
+                        for (String reason : reasons) {
+                            ValidationResult result = new ValidationResult.Builder()
+                                    .subject("ScriptValidation")
+                                    .valid(false)
+                                    .explanation(reason)
+                                    .build();
+                            results.add(result);
+                        }
+                    }
+                }
+
+                // get the exception route
+                exceptionRoute = s.getExceptionRoute();
+
+                return results;
+            } catch (ScriptException | IOException | NoSuchMethodException e) {
+                doCustomValidate.set(true);
+                results.add(new ValidationResult.Builder()
+                        .subject("ScriptValidation")
+                        .valid(false)
+                        .explanation("Cannot create script due to " + e.getMessage())
+                        .input(file)
+                        .build());
+                getLogger().error("Cannot create script due to " + e, e);
+                return results;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return; // fail-fast if there is no work to do
+        }
+
+        final String scriptFileName = context.getProperty(SCRIPT_FILE_NAME).getValue();
+        // doing this cloning because getProperties does not initialize props that have only their default values
+        // must do a getProperty for that value to be initialized
+        Map<String, String> props = new HashMap<>();
+        for (PropertyDescriptor propDesc : context.getProperties().keySet()) {
+            if (propDesc.isExpressionLanguageSupported()) {
+                props.put(propDesc.getName(), context.getProperty(propDesc).evaluateAttributeExpressions(flowFile).getValue());
+            } else {
+                props.put(propDesc.getName(), context.getProperty(propDesc).getValue());
+            }
+        }
+        Script script = null;
+        try {
+            final Script finalScript = scriptFactory.getScript(scriptFileName, props, flowFile);
+            script = finalScript;
+            if (finalScript instanceof ReaderScript) {
+                session.read(flowFile, new InputStreamCallback() {
+
+                    @Override
+                    public void process(InputStream in) throws IOException {
+                        try {
+                            ((ReaderScript) finalScript).process(new BufferedInputStream(in));
+                        } catch (NoSuchMethodException | ScriptException e) {
+                            getLogger().error("Failed to execute ReaderScript", e);
+                            throw new IOException(e);
+                        }
+                    }
+                });
+            } else if (finalScript instanceof WriterScript) {
+                flowFile = session.write(flowFile, new StreamCallback() {
+
+                    @Override
+                    public void process(InputStream in, OutputStream out) throws IOException {
+                        try {
+                            ((WriterScript) finalScript).process(new BufferedInputStream(in), new BufferedOutputStream(out));
+                            out.flush();
+                        } catch (NoSuchMethodException | ScriptException e) {
+                            getLogger().error("Failed to execute WriterScript", e);
+                            throw new IOException(e);
+                        }
+                    }
+                });
+            } else if (finalScript instanceof ConverterScript) {
+                ((ConverterScript) finalScript).process(session);
+
+                // Note that these scripts don't pass the incoming FF through,
+                // they always create new outputs
+                session.remove(flowFile);
+                return;
+            } else {
+                // only thing we can do is assume script has already run and done it's thing, so just transfer the incoming
+                // flowfile
+                getLogger().debug("Successfully executed script from {}", new Object[]{scriptFileName});
+            }
+
+            // update flow file attributes
+            flowFile = session.putAllAttributes(flowFile, finalScript.getAttributes());
+            Relationship route = finalScript.getRoute();
+            if (null == route) {
+                session.remove(flowFile);
+                getLogger().info("Removing flowfile {}", new Object[]{flowFile});
+            } else {
+                session.transfer(flowFile, route);
+                getLogger().info("Transferring flowfile {} to {}", new Object[]{flowFile, route});
+            }
+        } catch (ScriptException | IOException e) {
+            getLogger().error("Failed to create script from {} with flowFile {}. Rolling back session.",
+                    new Object[]{scriptFileName, flowFile}, e);
+            throw new ProcessException(e);
+        } catch (Exception e) {
+            if (null != script) {
+                getLogger().error("Failed to execute script from {}. Transferring flow file {} to {}",
+                        new Object[]{scriptFileName, flowFile, exceptionRoute}, e);
+                session.transfer(flowFile, exceptionRoute);
+            } else {
+                getLogger().error("Failed to execute script from {} with flowFile {}. Rolling back session",
+                        new Object[]{scriptFileName, flowFile}, e);
+                throw new ProcessException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java
new file mode 100644
index 0000000..7be47a8
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import javax.script.Invocable;
+import javax.script.ScriptException;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+/**
+ * <p>
+ * Script authors should extend this class if they want to perform complex
+ * conversions in a NiFi processor.
+ * </p>
+ *
+ * <p>
+ * Scripts must implement {@link #convert(FileInputStream)}. This method may
+ * create new FlowFiles and pass them to one or more routes. The input FlowFile
+ * will be removed from the repository after execution of this method completes.
+ * </p>
+ *
+ * <p>
+ * In general, the {@link #convert(FileInputStream)} will read from the supplied
+ * stream, then create one or more output sinks and route the result to the
+ * relationship of choice using
+ * {@link #routeStream(ByteArrayOutputStream, String, String)} or
+ * {@link #routeBytes(byte[], String, String)}.
+ *
+ * <p>
+ * Implement {@link #getProcessorRelationships()} to allow writing to
+ * relationships other than <code>success</code> and <code>failure</code>. The
+ * {@link #getRoute()} superclass method is *not* used by Converter Scripts.
+ * </p>
+ *
+ */
+public class ConverterScript extends Script {
+
+    private ProcessSession session; // used to create files
+    private Object convertCallback;
+
+    public ConverterScript() {
+
+    }
+
+    public ConverterScript(Object... callbacks) {
+        super(callbacks);
+        for (Object callback : callbacks) {
+            if (callback instanceof Map<?, ?>) {
+                convertCallback = convertCallback == null && ((Map<?, ?>) callback).containsKey("convert") ? callback : convertCallback;
+            }
+        }
+    }
+
+    // Subclasses should implement this to define basic logic
+    protected void convert(InputStream stream) throws NoSuchMethodException, ScriptException {
+        if (convertCallback != null) {
+            ((Invocable) engine).invokeMethod(convertCallback, "convert", stream);
+        }
+    }
+
+    /**
+     * Owning processor uses this method to kick off handling of a single file
+     *
+     * @param aSession the owning processor's Repository (needed to make new
+     * files)
+     */
+    public void process(ProcessSession aSession) {
+        this.session = aSession;
+        this.session.read(this.flowFile, new InputStreamCallback() {
+
+            @Override
+            public void process(InputStream in) throws IOException {
+                BufferedInputStream stream = new BufferedInputStream(in);
+                try {
+                    convert(stream);
+                } catch (NoSuchMethodException | ScriptException e) {
+                    logger.error("Failed to execute 'convert' function in script", e);
+                    throw new IOException(e);
+                }
+            }
+        });
+    }
+
+    // this should go back to protected once we get Nashorn
+    public void createFlowFile(final String flowFileName, final Relationship relationship, final OutputStreamHandler handler) {
+        FlowFile result = session.create(this.flowFile);
+        result = session.putAttribute(result, CoreAttributes.FILENAME.key(), flowFileName);
+        try {
+            result = session.write(result, new OutputStreamCallback() {
+
+                @Override
+                public void process(OutputStream out) throws IOException {
+                    handler.write(out);
+                }
+            });
+            this.logger.info("Transfer flow file {} to {}", new Object[]{result, relationship});
+            session.transfer(result, relationship);
+        } catch (Exception e) {
+            this.logger.error("Could not create new flow file from script", e);
+            session.remove(result);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java
new file mode 100644
index 0000000..883b688
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+
+public enum JRubyScriptFactory {
+
+    INSTANCE;
+
+    private static final String PRELOADS = "include Java\n"
+            + "\n"
+            + "java_import 'org.apache.nifi.components.PropertyDescriptor'\n"
+            + "java_import 'org.apache.nifi.components.Validator'\n"
+            + "java_import 'org.apache.nifi.processor.util.StandardValidators'\n"
+            + "java_import 'org.apache.nifi.processor.Relationship'\n"
+            + "java_import 'org.apache.nifi.logging.ProcessorLog'\n"
+            + "java_import 'org.apache.nifi.scripting.ReaderScript'\n"
+            + "java_import 'org.apache.nifi.scripting.WriterScript'\n"
+            + "java_import 'org.apache.nifi.scripting.ConverterScript'\n"
+            + "\n";
+
+    public String getScript(File scriptFile) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        sb.append(PRELOADS)
+                .append(FileUtils.readFileToString(scriptFile, "UTF-8"));
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java
new file mode 100644
index 0000000..774fb1f
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+
+public enum JavaScriptScriptFactory {
+
+    INSTANCE;
+
+    private static final String PRELOADS = "var Scripting = JavaImporter(\n"
+            + "        Packages.org.apache.nifi.components,\n"
+            + "        Packages.org.apache.nifi.processor.util,\n"
+            + "        Packages.org.apache.nifi.processor,\n"
+            + "        Packages.org.apache.nifi.logging,\n"
+            + "        Packages.org.apache.nifi.scripting,\n"
+            + "        Packages.org.apache.commons.io);\n"
+            + "var readFile = function (file) {\n"
+            + "  var script = Packages.org.apache.commons.io.FileUtils.readFileToString("
+            + "      new java.io.File($PATH, file)"
+            + "    );\n"
+            + "  return \"\" + script;\n"
+            + "}\n"
+            + "var require = function (file){\n"
+            + "  var exports={}, module={};\n"
+            + "  module.__defineGetter__('id', function(){return file;});"
+            + "  eval(readFile(file));\n"
+            + "  return exports;\n"
+            + "}\n";
+
+    public String getScript(File scriptFile) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        final String parent = StringUtils.replace(scriptFile.getParent(), "\\", "/");
+        sb.append(PRELOADS).append("var $PATH = \"").append(parent).append("\"\n")
+                .append(FileUtils.readFileToString(scriptFile, "UTF-8"));
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java
new file mode 100644
index 0000000..6b40b5e
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+
+public enum JythonScriptFactory {
+
+    INSTANCE;
+
+    private final static String PRELOADS = "from org.python.core.util import FileUtil\n"
+            + "from org.apache.nifi.components import PropertyDescriptor\n"
+            + "from org.apache.nifi.components import Validator\n"
+            + "from org.apache.nifi.processor.util import StandardValidators\n"
+            + "from org.apache.nifi.processor import Relationship\n"
+            + "from org.apache.nifi.logging import ProcessorLog\n"
+            + "from org.apache.nifi.scripting import ReaderScript\n"
+            + "from org.apache.nifi.scripting import WriterScript\n"
+            + "from org.apache.nifi.scripting import ConverterScript\n";
+
+    public String getScript(File scriptFile) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        sb.append(PRELOADS)
+                .append(FileUtils.readFileToString(scriptFile, "UTF-8"));
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java
new file mode 100644
index 0000000..d879722
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.OutputStream;
+
+public interface OutputStreamHandler {
+
+    void write(OutputStream out);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java
new file mode 100644
index 0000000..b1d89c0
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import javax.script.Invocable;
+import javax.script.ScriptException;
+
+import org.apache.nifi.processor.Relationship;
+
+/**
+ * <p>
+ * Script authors should extend this class if they want to follow the "reader"
+ * paradigm for NiFi processors.
+ * </p>
+ *
+ * <p>
+ * User scripts should implement {@link #route(InputStream)}. <code>route</code>
+ * uses a returned relationship name to determine where FlowFiles go. Scripts
+ * may also implement {@link #getProcessorRelationships()} to specify available
+ * relationship names.
+ * </p>
+ *
+ */
+public class ReaderScript extends Script {
+
+    private Object routeCallback;
+
+    public ReaderScript(Object... callbacks) {
+        super(callbacks);
+        for (Object callback : callbacks) {
+            if (callback instanceof Map<?, ?>) {
+                routeCallback = routeCallback == null && ((Map<?, ?>) callback).containsKey("route") ? callback : routeCallback;
+            }
+        }
+    }
+
+    public ReaderScript() {
+
+    }
+
+    // Simple helper
+    public void process(InputStream input) throws NoSuchMethodException, ScriptException {
+        lastRoute = route(input);
+    }
+
+    /**
+     * Subclasses should examine the provided inputstream, then determine which
+     * relationship the file will be sent down and return its name.
+     *
+     *
+     * @param in a Java InputStream containing the incoming FlowFile.
+     * @return a relationship name
+     * @throws ScriptException
+     * @throws NoSuchMethodException
+     */
+    public Relationship route(InputStream in) throws NoSuchMethodException, ScriptException {
+        Relationship relationship = null;
+        Invocable invocable = (Invocable) this.engine;
+        relationship = (Relationship) invocable.invokeMethod(routeCallback, "route", in);
+        return relationship;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java
new file mode 100644
index 0000000..786f541
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.script.Invocable;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.Relationship;
+
+/**
+ * <p>
+ * Base class for all scripts. In this framework, only ScriptEngines that
+ * implement javax.script.Invocable are supported.
+ *
+ * </p>
+ *
+ */
+public class Script {
+
+    public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
+            .name("success")
+            .description("Destination of successfully created flow files")
+            .build();
+    public static final Relationship FAIL_RELATIONSHIP = new Relationship.Builder()
+            .name("failure")
+            .description("Destination of flow files when a error occurs in the script")
+            .build();
+
+    static final Set<Relationship> RELATIONSHIPS;
+
+    static {
+        Set<Relationship> rels = new HashSet<>();
+        rels.add(FAIL_RELATIONSHIP);
+        rels.add(SUCCESS_RELATIONSHIP);
+        RELATIONSHIPS = Collections.unmodifiableSet(rels);
+    }
+
+    FlowFile flowFile = null;
+    ScriptEngine engine = null;
+
+    protected Map<String, String> properties = new HashMap<>();
+    protected Relationship lastRoute = SUCCESS_RELATIONSHIP;
+    protected ProcessorLog logger;
+    protected String scriptFileName;
+    protected Map<String, String> attributes = new HashMap<>();
+    protected long flowFileSize = 0;
+    protected long flowFileEntryDate = System.currentTimeMillis();
+
+    // the following are needed due to an inadequate JavaScript ScriptEngine. It will not allow
+    // subclassing a Java Class, only implementing a Java Interface. So, the syntax of JavaScript
+    // scripts looks like subclassing, but actually is just constructing a Script instance and
+    // passing in functions as args to the constructor. When we move to Nashorn JavaScript ScriptEngine
+    // in Java 8, we can get rid of these and revert the subclasses of this class to abstract.
+    protected Object propDescCallback;
+    protected Object relationshipsCallback;
+    protected Object validateCallback;
+    protected Object exceptionRouteCallback;
+
+    /**
+     * Create a Script without any parameters
+     */
+    public Script() {
+    }
+
+    public Script(Object... callbacks) {
+        for (Object callback : callbacks) {
+            if (callback instanceof Map<?, ?>) {
+                propDescCallback = propDescCallback == null && ((Map<?, ?>) callback).containsKey("getPropertyDescriptors") ? callback
+                        : propDescCallback;
+                relationshipsCallback = relationshipsCallback == null && ((Map<?, ?>) callback).containsKey("getRelationships") ? callback
+                        : relationshipsCallback;
+                validateCallback = validateCallback == null && ((Map<?, ?>) callback).containsKey("validate") ? callback : validateCallback;
+                exceptionRouteCallback = exceptionRouteCallback == null && ((Map<?, ?>) callback).containsKey("getExceptionRoute") ? callback
+                        : exceptionRouteCallback;
+            }
+        }
+    }
+
+    /**
+     * Specify a set of properties with corresponding NiFi validators.
+     *
+     * Subclasses that do not override this method will still have access to all
+     * properties via the "properties" field
+     *
+     * @return a list of PropertyDescriptors
+     * @throws ScriptException
+     * @throws NoSuchMethodException
+     */
+    @SuppressWarnings("unchecked")
+    public List<PropertyDescriptor> getPropertyDescriptors() throws NoSuchMethodException, ScriptException {
+        if (propDescCallback != null) {
+            return (List<PropertyDescriptor>) ((Invocable) engine).invokeMethod(propDescCallback, "getPropertyDescriptors", (Object) null);
+        }
+        return Collections.emptyList();
+    }
+
+    /**
+     * Specify a set of reasons why this processor should be invalid.
+     *
+     * Subclasses that do not override this method will depend only on
+     * individual property validators as specified in
+     * {@link #getPropertyDescriptors()}.
+     *
+     * @return a Collection of messages to display to the user, or an empty
+     * Collection if the processor configuration is OK.
+     * @throws ScriptException
+     * @throws NoSuchMethodException
+     */
+    @SuppressWarnings("unchecked")
+    public Collection<String> validate() throws NoSuchMethodException, ScriptException {
+        if (validateCallback != null) {
+            return (Collection<String>) ((Invocable) engine).invokeMethod(validateCallback, "validate", (Object) null);
+        }
+        return Collections.emptyList();
+    }
+
+    void setFlowFile(FlowFile ff) {
+        flowFile = ff;
+        if (null != ff) {
+            // have to clone because ff.getAttributes is unmodifiable
+            this.attributes = new HashMap<>(ff.getAttributes());
+            this.flowFileSize = ff.getSize();
+            this.flowFileEntryDate = ff.getEntryDate();
+        }
+    }
+
+    void setProperties(Map<String, String> map) {
+        properties = new HashMap<>(map);
+    }
+
+    /**
+     * Required to access entire properties map -- Jython (at least) won't let
+     * you read the member variable without a getter
+     *
+     * @return entire parameter map
+     */
+    // change back to protected when we get nashorn
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    /**
+     * Get the named parameter. Some scripting languages make a method call
+     * easier than accessing a member field, so this is a convenience method to
+     * look up values in the properties field.
+     *
+     * @param key a hash key
+     * @return the value pointed at by the key specified
+     */
+    public String getProperty(String key) {
+        return properties.get(key);
+    }
+
+    /**
+     * Name the various relationships by which a file can leave this processor.
+     * Subclasses may override this method to change available relationships.
+     *
+     * @return a collection of relationship names
+     * @throws ScriptException
+     * @throws NoSuchMethodException
+     */
+    @SuppressWarnings("unchecked")
+    public Collection<Relationship> getRelationships() throws NoSuchMethodException, ScriptException {
+        if (relationshipsCallback != null) {
+            return (Collection<Relationship>) ((Invocable) engine).invokeMethod(relationshipsCallback, "getRelationships", (Object) null);
+        }
+        return RELATIONSHIPS;
+    }
+
+    /**
+     * Determine what do with a file that has just been processed.
+     *
+     * After a script runs its "read" or "write" method, it should update the
+     * "lastRoute" field to specify the relationship to which the resulting file
+     * will be sent.
+     *
+     * @return a relationship name
+     */
+    public Relationship getRoute() {
+        return lastRoute;
+    }
+
+    // Required because of a potential issue in Rhino -- protected methods are visible in
+    // subclasses but protected fields (like "lastRoute") are not
+    // change back to protected when we get nashorn
+    public void setRoute(Relationship route) {
+        lastRoute = route;
+    }
+
+    /**
+     * Determine where to send a file if an exception is thrown during
+     * processing.
+     *
+     * Subclasses may override this method to use a different relationship, or
+     * to determine the relationship dynamically. Returning null causes the file
+     * to be deleted instead.
+     *
+     * Defaults to "failure".
+     *
+     * @return the name of the relationship to use in event of an exception, or
+     * null to delete the file.
+     * @throws ScriptException
+     * @throws NoSuchMethodException
+     */
+    public Relationship getExceptionRoute() throws NoSuchMethodException, ScriptException {
+        if (exceptionRouteCallback != null) {
+            return (Relationship) ((Invocable) engine).invokeMethod(exceptionRouteCallback, "getExceptionRoute", (Object) null);
+        }
+        return FAIL_RELATIONSHIP;
+    }
+
+    /*
+     * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to get
+     * the incoming flow file size.
+     */
+    // Change back to protected when we get nashorn
+    public long getFlowFileSize() {
+        return flowFileSize;
+    }
+
+    /*
+     * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to get
+     * entry date of the flow file.
+     */
+    // Change back to protected when we get nashorn
+    public long getFlowFileEntryDate() {
+        return flowFileEntryDate;
+    }
+
+    void setLogger(ProcessorLog logger) {
+        this.logger = logger;
+    }
+
+    /*
+     * Required so that scripts in some languages can read access the attribute. Jython (at least) won't let you read the member
+     * variable without a getter
+     */
+    protected ProcessorLog getLogger() {
+        return this.logger;
+    }
+
+    void setFileName(String scriptFileName) {
+        this.scriptFileName = scriptFileName;
+    }
+
+    public String getFileName() {
+        return this.scriptFileName;
+    }
+
+    // this one's public because it's needed by ExecuteScript to update the flow file's attributes AFTER processing is done
+    public Map<String, String> getAttributes() {
+        return this.attributes;
+    }
+
+    /*
+     * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to look
+     * up values in the attributes field.
+     */
+    // Change back to protected when we get nashorn
+    public String getAttribute(String key) {
+        return this.attributes.get(key);
+    }
+
+    /*
+     * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to set
+     * key/value pairs in the attributes field.
+     */
+    // Change back to protected when we get nashorn
+    public void setAttribute(String key, String value) {
+        this.attributes.put(key, value);
+    }
+
+    void setEngine(ScriptEngine scriptEngine) {
+        this.engine = scriptEngine;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java
new file mode 100644
index 0000000..6f38886
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.jruby.embed.PropertyName;
+
+public class ScriptEngineFactory {
+
+    private static final String THREADING = "THREADING";
+    private static final String MULTITHREADED = "MULTITHREADED";
+    private static final String STATELESS = "STATELESS";
+    private static final String THREAD_ISOLATED = "THREAD-ISOLATED";
+    final static ScriptEngineManager scriptEngMgr;
+
+    static {
+        System.setProperty(PropertyName.LOCALCONTEXT_SCOPE.toString(), "singlethread");
+        System.setProperty(PropertyName.COMPILEMODE.toString(), "jit");
+        System.setProperty(PropertyName.COMPATVERSION.toString(), "JRuby1.9");
+        System.setProperty(PropertyName.LOCALVARIABLE_BEHAVIOR.toString(), "transient");
+        System.setProperty("compile.invokedynamic", "false");
+        System.setProperty(PropertyName.LAZINESS.toString(), "true");
+        scriptEngMgr = new ScriptEngineManager();
+    }
+    final ConcurrentHashMap<String, ScriptEngine> threadSafeEngines = new ConcurrentHashMap<>();
+
+    ScriptEngine getEngine(String extension) {
+        ScriptEngine engine = threadSafeEngines.get(extension);
+        if (null == engine) {
+            engine = scriptEngMgr.getEngineByExtension(extension);
+            if (null == engine) {
+                throw new IllegalArgumentException("No ScriptEngine exists for extension " + extension);
+            }
+
+            Object threading = engine.getFactory().getParameter(THREADING);
+            // the MULTITHREADED status means that the scripts need to be careful about sharing state
+            if (THREAD_ISOLATED.equals(threading) || STATELESS.equals(threading) || MULTITHREADED.equals(threading)) {
+                ScriptEngine cachedEngine = threadSafeEngines.putIfAbsent(extension, engine);
+                if (null != cachedEngine) {
+                    engine = cachedEngine;
+                }
+            }
+        }
+        return engine;
+    }
+
+    ScriptEngine getNewEngine(File scriptFile, String extension) throws ScriptException {
+        ScriptEngine engine = scriptEngMgr.getEngineByExtension(extension);
+        if (null == engine) {
+            throw new IllegalArgumentException("No ScriptEngine exists for extension " + extension);
+        }
+        // Initialize some paths
+        StringBuilder sb = new StringBuilder();
+        switch (extension) {
+            case "rb":
+                String parent = scriptFile.getParent();
+                parent = StringUtils.replace(parent, "\\", "/");
+                sb.append("$:.unshift '")
+                        .append(parent)
+                        .append("'\n")
+                        .append("$:.unshift File.join '")
+                        .append(parent)
+                        .append("', 'lib'\n");
+                engine.eval(sb.toString());
+
+                break;
+            case "py":
+                parent = scriptFile.getParent();
+                parent = StringUtils.replace(parent, "\\", "/");
+                String lib = parent + "/lib";
+                sb.append("import sys\n").append("sys.path.append('").append(parent)
+                        .append("')\n").append("sys.path.append('")
+                        .append(lib)
+                        .append("')\n")
+                        .append("__file__ = '")
+                        .append(scriptFile.getAbsolutePath())
+                        .append("'\n");
+                engine.eval(sb.toString());
+                break;
+            default:
+                break;
+        }
+
+        Object threading = engine.getFactory().getParameter(THREADING);
+        // the MULTITHREADED status means that the scripts need to be careful about sharing state
+        if (THREAD_ISOLATED.equals(threading) || STATELESS.equals(threading) || MULTITHREADED.equals(threading)) {
+            // replace prior instance if any
+            threadSafeEngines.put(extension, engine);
+        }
+        return engine;
+    }
+
+    boolean isThreadSafe(String scriptExtension) {
+        return threadSafeEngines.containsKey(scriptExtension);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java
new file mode 100644
index 0000000..da18606
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import javax.script.Bindings;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.logging.ProcessorLog;
+
+import org.apache.commons.io.FileUtils;
+
+/**
+ * While this is a 'factory', it is not a singleton because we want a factory
+ * per processor. This factory has state, all of which belong to only one
+ * processor.
+ *
+ */
+public class ScriptFactory {
+
+    private final ScriptEngineFactory engineFactory = new ScriptEngineFactory();
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReadLock readLock = lock.readLock();
+    private final WriteLock writeLock = lock.writeLock();
+    private final ProcessorLog logger;
+
+    private volatile CompiledScript compiledScript;
+    private volatile String scriptText;
+    private volatile byte[] md5Hash;
+    private volatile long lastTimeChecked;
+    private volatile String scriptFileName;
+    private volatile long scriptCheckIntervalMS = 15000;
+
+    public ScriptFactory(ProcessorLog logger) {
+        this.logger = logger;
+    }
+
+    public void setScriptCheckIntervalMS(long msecs) {
+        this.scriptCheckIntervalMS = msecs;
+    }
+
+    /**
+     * @param aScriptFileName
+     * @param properties
+     * @param flowFile
+     * @return
+     * @throws IOException
+     * @throws ScriptException
+     */
+    public Script getScript(final String aScriptFileName, final Map<String, String> properties, final FlowFile flowFile)
+            throws IOException, ScriptException {
+        final Script instance;
+        long now = System.currentTimeMillis();
+        readLock.lock();
+        try {
+            if (!aScriptFileName.equals(this.scriptFileName)) {
+                readLock.unlock();
+                writeLock.lock();
+                try {
+                    if (!aScriptFileName.equals(this.scriptFileName)) {
+                        // need to get brand new engine
+                        compiledScript = null;
+                        this.md5Hash = getMD5Hash(aScriptFileName);
+                        this.lastTimeChecked = now;
+                        this.scriptFileName = aScriptFileName;
+                        updateEngine();
+                    } // else another thread beat me to the change...so just get a script
+                } finally {
+                    readLock.lock();
+                    writeLock.unlock();
+                }
+            } else if (lastTimeChecked + scriptCheckIntervalMS < now) {
+                readLock.unlock();
+                writeLock.lock();
+                try {
+                    if (lastTimeChecked + scriptCheckIntervalMS < now) {
+                        byte[] md5 = getMD5Hash(this.scriptFileName);
+                        if (!MessageDigest.isEqual(md5Hash, md5)) {
+                            // need to get brand new engine
+                            compiledScript = null;
+                            updateEngine();
+                            this.md5Hash = md5;
+                        } // else no change to script, so just update time checked
+                        this.lastTimeChecked = now;
+                    } // else another thread beat me to the check...so just get a script
+                } finally {
+                    readLock.lock();
+                    writeLock.unlock();
+                }
+            }
+            try {
+                instance = getScriptInstance(properties);
+                instance.setFileName(this.scriptFileName);
+                instance.setProperties(properties);
+                instance.setLogger(logger);
+                instance.setFlowFile(flowFile);
+            } catch (ScriptException e) {
+                // need to reset state to enable re-initialization
+                this.lastTimeChecked = 0;
+                this.scriptFileName = null;
+                throw e;
+            }
+        } finally {
+            readLock.unlock();
+        }
+
+        return instance;
+
+    }
+
+    public Script getScript(String aScriptFileName) throws ScriptException, IOException {
+        Map<String, String> props = new HashMap<>();
+        return getScript(aScriptFileName, props, null);
+    }
+
+    private byte[] getMD5Hash(String aScriptFileName) throws FileNotFoundException, IOException {
+        byte[] messageDigest = null;
+        try (FileInputStream fis = new FileInputStream(aScriptFileName);
+                DigestInputStream dis = new DigestInputStream(new BufferedInputStream(fis), MessageDigest.getInstance("MD5"))) {
+
+            byte[] bytes = new byte[8192];
+            while (dis.read(bytes) != -1) {
+                // do nothing...just computing the md5 hash
+            }
+            messageDigest = dis.getMessageDigest().digest();
+        } catch (NoSuchAlgorithmException swallow) {
+            // MD5 is a legitimate format
+        }
+        return messageDigest;
+    }
+
+    private String getScriptText(File scriptFile, String extension) throws IOException {
+        final String script;
+        switch (extension) {
+            case "rb":
+                script = JRubyScriptFactory.INSTANCE.getScript(scriptFile);
+                break;
+
+            case "js":
+                script = JavaScriptScriptFactory.INSTANCE.getScript(scriptFile);
+                break;
+
+            case "py":
+                script = JythonScriptFactory.INSTANCE.getScript(scriptFile);
+                break;
+
+            default:
+                script = FileUtils.readFileToString(scriptFile);
+        }
+        return script;
+    }
+
+    private Script getScriptInstance(final Map<String, String> properties) throws ScriptException {
+
+        Map<String, Object> localThreadVariables = new HashMap<>();
+        final String extension = getExtension(scriptFileName);
+        String loggerVariableKey = getVariableName("GLOBAL", "logger", extension);
+        localThreadVariables.put(loggerVariableKey, logger);
+        String propertiesVariableKey = getVariableName("INSTANCE", "properties", extension);
+        localThreadVariables.put(propertiesVariableKey, properties);
+        localThreadVariables.put(ScriptEngine.FILENAME, scriptFileName);
+        final Bindings bindings = new SimpleBindings(localThreadVariables);
+        final ScriptEngine scriptEngine = engineFactory.getEngine(extension);
+        Script instance;
+        if (compiledScript == null) {
+            instance = (Script) scriptEngine.eval(scriptText, bindings);
+            if (instance == null) { // which it will be for python and also for local variables in javascript
+                instance = (Script) scriptEngine.eval("instance", bindings);
+            }
+        } else {
+            instance = (Script) compiledScript.eval(bindings);
+            if (instance == null) { // which it will be for python and also for local variables in javascript
+                instance = (Script) compiledScript.getEngine().eval("instance", bindings);
+            }
+        }
+        instance.setEngine(scriptEngine);
+        return instance;
+    }
+
+    /*
+     * Must have writeLock when calling this!!!!
+     */
+    private void updateEngine() throws IOException, ScriptException {
+        final String extension = getExtension(scriptFileName);
+        // if engine is thread safe, it's being reused...if it's a JrubyEngine it
+        File scriptFile = new File(this.scriptFileName);
+        ScriptEngine scriptEngine = engineFactory.getNewEngine(scriptFile, extension);
+        scriptText = getScriptText(scriptFile, extension);
+        Map<String, Object> localThreadVariables = new HashMap<>();
+        String loggerVariableKey = getVariableName("GLOBAL", "logger", extension);
+        localThreadVariables.put(loggerVariableKey, logger);
+        String propertiesVariableKey = getVariableName("INSTANCE", "properties", extension);
+        localThreadVariables.put(propertiesVariableKey, new HashMap<String, String>());
+        localThreadVariables.put(ScriptEngine.FILENAME, scriptFileName);
+        if (scriptEngine instanceof Compilable) {
+            Bindings bindings = new SimpleBindings(localThreadVariables);
+            scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
+            compiledScript = ((Compilable) scriptEngine).compile(scriptText);
+        }
+        logger.debug("Updating Engine!!");
+    }
+
+    private String getVariableName(String scope, String variableName, String extension) {
+        String result;
+        switch (extension) {
+            case "rb":
+                switch (scope) {
+                    case "GLOBAL":
+                        result = '$' + variableName;
+                        break;
+                    case "INSTANCE":
+                        result = '@' + variableName;
+                        break;
+                    default:
+                        result = variableName;
+                        break;
+                }
+
+                break;
+
+            default:
+                result = variableName;
+                break;
+        }
+        return result;
+    }
+
+    private String getExtension(String aScriptFileName) {
+        int dotPos = aScriptFileName.lastIndexOf('.');
+        if (dotPos < 1) {
+            throw new IllegalArgumentException("Script file name must have an extension");
+        }
+        final String extension = aScriptFileName.substring(dotPos + 1);
+        return extension;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java
new file mode 100644
index 0000000..7eef98b
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import javax.script.Invocable;
+import javax.script.ScriptException;
+
+/**
+ * <p>
+ * Script authors should extend this class if they want to follow the
+ * "processCallback" paradigm for NiFi processors.
+ * </p>
+ *
+ * <p>
+ * At a minimum, scripts must implement
+ * <code>process(FileInputStream, FileOutputStream)</code>.
+ * </p>
+ *
+ * <p>
+ * By default, all files processed will be sent to the relationship
+ * <em>success</em>, unless the scriptFileName raises an exception, in which
+ * case the file will be sent to <em>failure</em>. Implement
+ * {@link #getProcessorRelationships()} and/or {@link #getRoute()} to change
+ * this behavior.
+ * </p>
+ *
+ */
+public class WriterScript extends Script {
+
+    private Object processCallback;
+
+    public WriterScript() {
+
+    }
+
+    public WriterScript(Object... callbacks) {
+        super(callbacks);
+        for (Object callback : callbacks) {
+            if (callback instanceof Map<?, ?>) {
+                processCallback = processCallback == null && ((Map<?, ?>) callback).containsKey("process") ? callback : processCallback;
+            }
+        }
+    }
+
+    public void process(InputStream in, OutputStream out) throws NoSuchMethodException, ScriptException {
+        Invocable inv = (Invocable) engine;
+        inv.invokeMethod(processCallback, "process", in, out);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..20a3982
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.script.ExecuteScript