You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:55 UTC
[32/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
new file mode 100644
index 0000000..9058cf4
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.script;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.script.ScriptException;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.annotation.CapabilityDescription;
+import org.apache.nifi.processor.annotation.EventDriven;
+import org.apache.nifi.processor.annotation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.scripting.ConverterScript;
+import org.apache.nifi.scripting.ReaderScript;
+import org.apache.nifi.scripting.Script;
+import org.apache.nifi.scripting.ScriptFactory;
+import org.apache.nifi.scripting.WriterScript;
+
+/**
+ * <!-- Processor Documentation ================================================== -->
+ * <h2>Description:</h2>
+ * <p>
+ * This processor provides the capability to execute scripts in various
+ * scripting languages, and passes into the scripts the input stream and output
+ * stream(s) representing an incoming flow file and any created flow files. The
+ * processor is designed to be thread safe, so multiple concurrent tasks may
+ * execute against a single script. The processor provides a framework which
+ * enables script writers to implement 3 different types of scripts:
+ * <ul>
+ * ReaderScript - which enables stream-based reading of a FlowFile's
+ * content</br> WriterScript - which enables stream-based reading and
+ * writing/modifying of a FlowFile's content</br> ConverterScript - which
+ * enables stream-based reading a FlowFile's content and stream-based writing to
+ * newly created FlowFiles</br>
+ * </ul>
+ * Presently, the processor supports 3 scripting languages: Ruby, Python, and
+ * JavaScript. The processor is built on the javax.script API which enables
+ * ScriptEngine discovery, thread management, and encapsulates much of the low
+ * level bridging-code that enables Java to Script language integration. Thus,
+ * it is designed to be easily extended to other scripting languages. </br> The
+ * attributes of a FlowFile and properties of the Processor are exposed to the
+ * script by either a variable in the base class or a getter method. A script
+ * may declare new Processor Properties and different Relationships via
+ * overriding the getPropertyDescriptors and getRelationships methods,
+ * respectively.
+ * </p>
+ * <p>
+ * <strong>Properties:</strong>
+ * </p>
+ * <p>
+ * In the list below, the names of required properties appear in bold. Any other
+ * properties (not in bold) are considered optional. If a property has a default
+ * value, it is indicated. If a property supports the use of the NiFi Expression
+ * Language (or simply, "expression language"), that is also indicated. Of
+ * particular note: This processor allows scripts to define additional Processor
+ * properties, which will not be initially visible. Once the processor's
+ * configuration is validated, script defined properties will become visible,
+ * and may affect the validity of the processor.
+ * </p>
+ * <ul>
+ * <li>
+ * <strong>Script File Name</strong>
+ * <ul>
+ * <li>Script location, can be relative or absolute path.</li>
+ * <li>Default value: no default</li>
+ * <li>Supports expression language: false</li>
+ * </ul>
+ * </li>
+ * <li>
+ * <strong>Script Check Interval</strong>
+ * <ul>
+ * <li>The time period between checking for updates to a script.</li>
+ * <li>Default value: 15 sec</li>
+ * <li>Supports expression language: false</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <p>
+ * <strong>Relationships:</strong>
+ * </p>
+ * <p>
+ * The initial 'out of the box' relationships are below. Of particular note is
+ * the ability of a script to change the set of relationships. However, any
+ * relationships defined by the script will not be visible until the processor's
+ * configuration has been validated. Once done, new relationships will become
+ * visible.
+ * </p>
+ * <ul>
+ * <li>
+ * success
+ * <ul>
+ * <li>Used when a file is successfully processed by a script.</li>
+ * </ul>
+ * </li>
+ * <li>
+ * failure
+ * <ul>
+ * <li>Used when an error occurs while processing a file with a script.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <p>
+ * <strong>Example Scripts:</strong>
+ * </p>
+ * <ul>
+ * JavaScript example - the 'with' statement imports packages defined in the
+ * framework. Since the 'instance' variable is intended to be local scope (not
+ * global), it must be named 'instance' as it it not passed back to the
+ * processor upon script evaluation and must be fetched. If you make it global,
+ * you can name it whatever you'd like...but this is intended to be
+ * multi-threaded so do so at your own risk. Presently, there are issues with
+ * the JavaScript scripting engine that prevent sub-classing the base classes in
+ * the Processor's Java framework. So, what is actually happening is an instance
+ * of the ReaderScript is created with a provided callback object. When we are
+ * able to move to a more competent scripting engine, the code below will remain
+ * the same, but the 'instance' variable will actually be a sub-class of
+ * ReaderScript.
+ *
+ * <pre>
+ * with (Scripting) {
+ * var instance = new ReaderScript({
+ * route : function(input) {
+ * var str = IOUtils.toString(input);
+ * var expr = instance.getProperty("expr");
+ * filename = instance.attributes.get("filename");
+ * instance.setAttribute("filename", filename + ".modified");
+ * if (str.match(expr)) {
+ * return Script.FAIL_RELATIONSHIP;
+ * } else {
+ * return Script.SUCCESS_RELATIONSHIP;
+ * }
+ * }
+ * });
+ * }
+ * </pre>
+ *
+ * Ruby example - the 'OutputStreamHandler' is an interface which is called when
+ * creating flow files.
+ *
+ * <pre>
+ * java_import 'org.apache.nifi.scripting.OutputStreamHandler'
+ * class SimpleConverter < ConverterScript
+ * field_reader :FAIL_RELATIONSHIP, :SUCCESS_RELATIONSHIP, :logger, :attributes
+ *
+ * def convert(input)
+ * in_io = input.to_io
+ * createFlowFile("firstLine", FAIL_RELATIONSHIP, OutputStreamHandler.impl do |method, out|
+ * out_io = out.to_io
+ * out_io << in_io.readline.to_java_bytes
+ * out_io.close
+ * logger.debug("Wrote data to failure...this message logged with logger from super class")
+ * end)
+ *
+ * createFlowFile("otherLines", SUCCESS_RELATIONSHIP, OutputStreamHandler.impl do |method, out|
+ * out_io = out.to_io
+ * in_io.each_line { |line|
+ * out_io << line
+ * }
+ * out_io.close
+ * logger.debug("Wrote data to success...this message logged with logger from super class")
+ * end)
+ * in_io.close
+ * end
+ *
+ * end
+ *
+ * $logger.debug("Creating SimpleConverter...this message logged with logger from shared variables")
+ * SimpleConverter.new
+ * </pre>
+ *
+ * Python example - The difficulty with Python is that it does not return
+ * objects upon script evaluation, so the instance of the Script class must be
+ * fetched by name. Thus, you must define a variable called 'instance'.
+ *
+ * <pre>
+ * import re
+ *
+ * class RoutingReader(ReaderScript):
+ * A = Relationship.Builder().name("a").description("some good stuff").build()
+ * B = Relationship.Builder().name("b").description("some other stuff").build()
+ * C = Relationship.Builder().name("c").description("some bad stuff").build()
+ *
+ * def getRelationships(self):
+ * return [self.A,self.B,self.C]
+ *
+ * def getExceptionRoute(self):
+ * return self.C
+ *
+ * def route( self, input ):
+ * for line in FileUtil.wrap(input):
+ * if re.match("^bad", line, re.IGNORECASE):
+ * return self.B
+ * if re.match("^sed", line):
+ * raise RuntimeError("That's no good!")
+ *
+ * return self.A
+ *
+ * instance = RoutingReader()
+ * </pre>
+ *
+ * </ul>
+ * <p>
+ * <strong>Shared Variables</strong>
+ * </p>
+ * <ul>
+ * <li>logger : global scope</li>
+ * <li>properties : local/instance scope</li>
+ * </ul>
+ * <p>
+ * <strong>Script API:</strong>
+ * </p>
+ * <ul>
+ * <li>getAttribute(String) : String</li>
+ * <li>getAttributes() : Map(String,String)</li>
+ * <li>getExceptionRoute() : Relationship</li>
+ * <li>getFileName() : String</li>
+ * <li>getFlowFileEntryDate() : Calendar</li>
+ * <li>getFlowFileSize() : long</li>
+ * <li>getProperties() : Map(String, String)</li>
+ * <li>getProperty(String) : String</li>
+ * <li>getPropertyDescriptors() : List(PropertyDescriptor)</li>
+ * <li>getRelationships() : Collection(Relationship)</li>
+ * <li>getRoute() : Relationship</li>
+ * <li>setRoute(Relationship)</li>
+ * <li>setAttribute(String, String)</li>
+ * <li>validate() : Collection(String)</li>
+ * </ul>
+ * <p>
+ * <strong>ReaderScript API:</strong>
+ * </p>
+ * <ul>
+ * <li>route(InputStream) : Relationship</li>
+ * </ul>
+ * <p>
+ * <strong>WriterScript API:</strong>
+ * </p>
+ * <ul>
+ * <li>process(InputStream, OutputStream)</li>
+ * </ul>
+ * <p>
+ * <strong>ConverterScript API:</strong>
+ * </p>
+ * <ul>
+ * <li>convert(InputStream)</li>
+ * <li>createFlowFile(String, Relationship, OutputStreamHandler)</li>
+ * </ul>
+ * <p>
+ * <strong>OutputStreamHandler API:</strong>
+ * </p>
+ * <ul>
+ * <li>write(OutputStream)</li>
+ * </ul>
+ */
+@EventDriven
+@Tags({"script", "ruby", "python", "javascript", "execute"})
+@CapabilityDescription("Execute scripts in various scripting languages, and passes into the scripts the input stream and output stream(s) "
+ + "representing an incoming flow file and any created flow files.")
+public class ExecuteScript extends AbstractProcessor {
+
+ private final AtomicBoolean doCustomValidate = new AtomicBoolean(true);
+ private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
+ private final AtomicReference<List<PropertyDescriptor>> propertyDescriptors = new AtomicReference<>();
+ private volatile ScriptFactory scriptFactory;
+ private volatile Relationship exceptionRoute;
+
+ /**
+ * Script location, can be relative or absolute path -- passed as-is to
+ * {@link File#File(String) File constructor}
+ */
+ public static final PropertyDescriptor SCRIPT_FILE_NAME = new PropertyDescriptor.Builder()
+ .name("Script File Name")
+ .description("Script location, can be relative or absolute path")
+ .required(true)
+ .addValidator(new Validator() {
+
+ @Override
+ public ValidationResult validate(String subject, String input, ValidationContext context) {
+ ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR.validate(subject, input, context);
+ if (result.isValid()) {
+ int dotPos = input.lastIndexOf('.');
+ if (dotPos < 1) {
+ result = new ValidationResult.Builder()
+ .subject(subject)
+ .valid(false)
+ .explanation("Filename must have an extension")
+ .input(input)
+ .build();
+ }
+ }
+ return result;
+ }
+ })
+ .build();
+
+ static final PropertyDescriptor SCRIPT_CHECK_INTERVAL = new PropertyDescriptor.Builder()
+ .name("Script Check Interval")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .description("The time period between checking for updates to a script")
+ .required(true)
+ .defaultValue("15 sec")
+ .build();
+
+ @Override
+ protected void init(ProcessorInitializationContext context) {
+ Set<Relationship> empty = Collections.emptySet();
+ relationships.set(empty);
+ ArrayList<PropertyDescriptor> propDescs = new ArrayList<>();
+ propDescs.add(SCRIPT_FILE_NAME);
+ propDescs.add(SCRIPT_CHECK_INTERVAL);
+ propertyDescriptors.set(Collections.unmodifiableList(propDescs));
+ scriptFactory = new ScriptFactory(getLogger());
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors.get();
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .dynamic(true)
+ .addValidator(Validator.VALID)
+ .build();
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+ doCustomValidate.set(true);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships.get();
+ }
+
+ /**
+ * Called by framework.
+ *
+ * Returns a list of reasons why this processor cannot be run.
+ * @return
+ */
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ if (doCustomValidate.getAndSet(false)) {
+ long interval = validationContext.getProperty(SCRIPT_CHECK_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+ scriptFactory.setScriptCheckIntervalMS(interval);
+ List<ValidationResult> results = new ArrayList<>();
+ String file = validationContext.getProperty(SCRIPT_FILE_NAME).getValue();
+ try {
+ Script s = scriptFactory.getScript(file);
+
+ // set the relationships of the processor
+ relationships.set(new HashSet<>(s.getRelationships()));
+
+ // need to get script's prop. descs. and validate. May, or may not, have dynamic
+ // props already...depends if this is the first time the processor is being configured.
+ Map<PropertyDescriptor, String> properties = validationContext.getProperties();
+
+ // need to compare props, if any, against script-expected props that are required.
+ // script may be expecting required props that are not known, or some props may have invalid
+ // values.
+ // processor may be configured with dynamic props that the script will use...but does not declare which would
+ // be a bad thing
+ List<PropertyDescriptor> scriptPropDescs = s.getPropertyDescriptors();
+ getLogger().debug("Script is {}", new Object[]{s});
+ getLogger().debug("Script file name is {}", new Object[]{s.getFileName()});
+ getLogger().debug("Script Prop Descs are: {}", new Object[]{scriptPropDescs.toString()});
+ getLogger().debug("Thread is: {}", new Object[]{Thread.currentThread().toString()});
+ for (PropertyDescriptor propDesc : scriptPropDescs) {
+ // need to check for missing props
+ if (propDesc.isRequired() && !properties.containsKey(propDesc)) {
+ results.add(new ValidationResult.Builder()
+ .subject("Script Properties")
+ .valid(false)
+ .explanation("Missing Property " + propDesc.getName())
+ .build());
+
+ // need to validate current value against script provided validator
+ } else if (properties.containsKey(propDesc)) {
+ String value = properties.get(propDesc);
+ ValidationResult result = propDesc.validate(value, validationContext);
+ if (!result.isValid()) {
+ results.add(result);
+ }
+ } // else it is an optional prop according to the script and it is not specified by
+ // the configuration of the processor
+ }
+
+ // need to update the known prop desc's with what we just got from the script
+ List<PropertyDescriptor> pds = new ArrayList<>(propertyDescriptors.get());
+ pds.addAll(scriptPropDescs);
+ propertyDescriptors.set(Collections.unmodifiableList(pds));
+
+ if (results.isEmpty()) {
+ // so needed props are supplied and individually validated, now validate script
+ Collection<String> reasons;
+ reasons = s.validate();
+ if (null == reasons) {
+ getLogger().warn("Script had invalid return value for validate(), ignoring.");
+ } else {
+ for (String reason : reasons) {
+ ValidationResult result = new ValidationResult.Builder()
+ .subject("ScriptValidation")
+ .valid(false)
+ .explanation(reason)
+ .build();
+ results.add(result);
+ }
+ }
+ }
+
+ // get the exception route
+ exceptionRoute = s.getExceptionRoute();
+
+ return results;
+ } catch (ScriptException | IOException | NoSuchMethodException e) {
+ doCustomValidate.set(true);
+ results.add(new ValidationResult.Builder()
+ .subject("ScriptValidation")
+ .valid(false)
+ .explanation("Cannot create script due to " + e.getMessage())
+ .input(file)
+ .build());
+ getLogger().error("Cannot create script due to " + e, e);
+ return results;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return; // fail-fast if there is no work to do
+ }
+
+ final String scriptFileName = context.getProperty(SCRIPT_FILE_NAME).getValue();
+ // doing this cloning because getProperties does not initialize props that have only their default values
+ // must do a getProperty for that value to be initialized
+ Map<String, String> props = new HashMap<>();
+ for (PropertyDescriptor propDesc : context.getProperties().keySet()) {
+ if (propDesc.isExpressionLanguageSupported()) {
+ props.put(propDesc.getName(), context.getProperty(propDesc).evaluateAttributeExpressions(flowFile).getValue());
+ } else {
+ props.put(propDesc.getName(), context.getProperty(propDesc).getValue());
+ }
+ }
+ Script script = null;
+ try {
+ final Script finalScript = scriptFactory.getScript(scriptFileName, props, flowFile);
+ script = finalScript;
+ if (finalScript instanceof ReaderScript) {
+ session.read(flowFile, new InputStreamCallback() {
+
+ @Override
+ public void process(InputStream in) throws IOException {
+ try {
+ ((ReaderScript) finalScript).process(new BufferedInputStream(in));
+ } catch (NoSuchMethodException | ScriptException e) {
+ getLogger().error("Failed to execute ReaderScript", e);
+ throw new IOException(e);
+ }
+ }
+ });
+ } else if (finalScript instanceof WriterScript) {
+ flowFile = session.write(flowFile, new StreamCallback() {
+
+ @Override
+ public void process(InputStream in, OutputStream out) throws IOException {
+ try {
+ ((WriterScript) finalScript).process(new BufferedInputStream(in), new BufferedOutputStream(out));
+ out.flush();
+ } catch (NoSuchMethodException | ScriptException e) {
+ getLogger().error("Failed to execute WriterScript", e);
+ throw new IOException(e);
+ }
+ }
+ });
+ } else if (finalScript instanceof ConverterScript) {
+ ((ConverterScript) finalScript).process(session);
+
+ // Note that these scripts don't pass the incoming FF through,
+ // they always create new outputs
+ session.remove(flowFile);
+ return;
+ } else {
+ // only thing we can do is assume script has already run and done it's thing, so just transfer the incoming
+ // flowfile
+ getLogger().debug("Successfully executed script from {}", new Object[]{scriptFileName});
+ }
+
+ // update flow file attributes
+ flowFile = session.putAllAttributes(flowFile, finalScript.getAttributes());
+ Relationship route = finalScript.getRoute();
+ if (null == route) {
+ session.remove(flowFile);
+ getLogger().info("Removing flowfile {}", new Object[]{flowFile});
+ } else {
+ session.transfer(flowFile, route);
+ getLogger().info("Transferring flowfile {} to {}", new Object[]{flowFile, route});
+ }
+ } catch (ScriptException | IOException e) {
+ getLogger().error("Failed to create script from {} with flowFile {}. Rolling back session.",
+ new Object[]{scriptFileName, flowFile}, e);
+ throw new ProcessException(e);
+ } catch (Exception e) {
+ if (null != script) {
+ getLogger().error("Failed to execute script from {}. Transferring flow file {} to {}",
+ new Object[]{scriptFileName, flowFile, exceptionRoute}, e);
+ session.transfer(flowFile, exceptionRoute);
+ } else {
+ getLogger().error("Failed to execute script from {} with flowFile {}. Rolling back session",
+ new Object[]{scriptFileName, flowFile}, e);
+ throw new ProcessException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java
new file mode 100644
index 0000000..7be47a8
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import javax.script.Invocable;
+import javax.script.ScriptException;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+/**
+ * <p>
+ * Script authors should extend this class if they want to perform complex
+ * conversions in a NiFi processor.
+ * </p>
+ *
+ * <p>
+ * Scripts must implement {@link #convert(FileInputStream)}. This method may
+ * create new FlowFiles and pass them to one or more routes. The input FlowFile
+ * will be removed from the repository after execution of this method completes.
+ * </p>
+ *
+ * <p>
+ * In general, the {@link #convert(FileInputStream)} will read from the supplied
+ * stream, then create one or more output sinks and route the result to the
+ * relationship of choice using
+ * {@link #routeStream(ByteArrayOutputStream, String, String)} or
+ * {@link #routeBytes(byte[], String, String)}.
+ *
+ * <p>
+ * Implement {@link #getProcessorRelationships()} to allow writing to
+ * relationships other than <code>success</code> and <code>failure</code>. The
+ * {@link #getRoute()} superclass method is *not* used by Converter Scripts.
+ * </p>
+ *
+ */
+public class ConverterScript extends Script {
+
+ private ProcessSession session; // used to create files
+ private Object convertCallback;
+
+ public ConverterScript() {
+
+ }
+
+ public ConverterScript(Object... callbacks) {
+ super(callbacks);
+ for (Object callback : callbacks) {
+ if (callback instanceof Map<?, ?>) {
+ convertCallback = convertCallback == null && ((Map<?, ?>) callback).containsKey("convert") ? callback : convertCallback;
+ }
+ }
+ }
+
+ // Subclasses should implement this to define basic logic
+ protected void convert(InputStream stream) throws NoSuchMethodException, ScriptException {
+ if (convertCallback != null) {
+ ((Invocable) engine).invokeMethod(convertCallback, "convert", stream);
+ }
+ }
+
+ /**
+ * Owning processor uses this method to kick off handling of a single file
+ *
+ * @param aSession the owning processor's Repository (needed to make new
+ * files)
+ */
+ public void process(ProcessSession aSession) {
+ this.session = aSession;
+ this.session.read(this.flowFile, new InputStreamCallback() {
+
+ @Override
+ public void process(InputStream in) throws IOException {
+ BufferedInputStream stream = new BufferedInputStream(in);
+ try {
+ convert(stream);
+ } catch (NoSuchMethodException | ScriptException e) {
+ logger.error("Failed to execute 'convert' function in script", e);
+ throw new IOException(e);
+ }
+ }
+ });
+ }
+
+ // this should go back to protected once we get Nashorn
+ public void createFlowFile(final String flowFileName, final Relationship relationship, final OutputStreamHandler handler) {
+ FlowFile result = session.create(this.flowFile);
+ result = session.putAttribute(result, CoreAttributes.FILENAME.key(), flowFileName);
+ try {
+ result = session.write(result, new OutputStreamCallback() {
+
+ @Override
+ public void process(OutputStream out) throws IOException {
+ handler.write(out);
+ }
+ });
+ this.logger.info("Transfer flow file {} to {}", new Object[]{result, relationship});
+ session.transfer(result, relationship);
+ } catch (Exception e) {
+ this.logger.error("Could not create new flow file from script", e);
+ session.remove(result);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java
new file mode 100644
index 0000000..883b688
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+
+public enum JRubyScriptFactory {
+
+ INSTANCE;
+
+ private static final String PRELOADS = "include Java\n"
+ + "\n"
+ + "java_import 'org.apache.nifi.components.PropertyDescriptor'\n"
+ + "java_import 'org.apache.nifi.components.Validator'\n"
+ + "java_import 'org.apache.nifi.processor.util.StandardValidators'\n"
+ + "java_import 'org.apache.nifi.processor.Relationship'\n"
+ + "java_import 'org.apache.nifi.logging.ProcessorLog'\n"
+ + "java_import 'org.apache.nifi.scripting.ReaderScript'\n"
+ + "java_import 'org.apache.nifi.scripting.WriterScript'\n"
+ + "java_import 'org.apache.nifi.scripting.ConverterScript'\n"
+ + "\n";
+
+ public String getScript(File scriptFile) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append(PRELOADS)
+ .append(FileUtils.readFileToString(scriptFile, "UTF-8"));
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java
new file mode 100644
index 0000000..774fb1f
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+
+public enum JavaScriptScriptFactory {
+
+ INSTANCE;
+
+ private static final String PRELOADS = "var Scripting = JavaImporter(\n"
+ + " Packages.org.apache.nifi.components,\n"
+ + " Packages.org.apache.nifi.processor.util,\n"
+ + " Packages.org.apache.nifi.processor,\n"
+ + " Packages.org.apache.nifi.logging,\n"
+ + " Packages.org.apache.nifi.scripting,\n"
+ + " Packages.org.apache.commons.io);\n"
+ + "var readFile = function (file) {\n"
+ + " var script = Packages.org.apache.commons.io.FileUtils.readFileToString("
+ + " new java.io.File($PATH, file)"
+ + " );\n"
+ + " return \"\" + script;\n"
+ + "}\n"
+ + "var require = function (file){\n"
+ + " var exports={}, module={};\n"
+ + " module.__defineGetter__('id', function(){return file;});"
+ + " eval(readFile(file));\n"
+ + " return exports;\n"
+ + "}\n";
+
+ public String getScript(File scriptFile) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ final String parent = StringUtils.replace(scriptFile.getParent(), "\\", "/");
+ sb.append(PRELOADS).append("var $PATH = \"").append(parent).append("\"\n")
+ .append(FileUtils.readFileToString(scriptFile, "UTF-8"));
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java
new file mode 100644
index 0000000..6b40b5e
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+
+public enum JythonScriptFactory {
+
+ INSTANCE;
+
+ private final static String PRELOADS = "from org.python.core.util import FileUtil\n"
+ + "from org.apache.nifi.components import PropertyDescriptor\n"
+ + "from org.apache.nifi.components import Validator\n"
+ + "from org.apache.nifi.processor.util import StandardValidators\n"
+ + "from org.apache.nifi.processor import Relationship\n"
+ + "from org.apache.nifi.logging import ProcessorLog\n"
+ + "from org.apache.nifi.scripting import ReaderScript\n"
+ + "from org.apache.nifi.scripting import WriterScript\n"
+ + "from org.apache.nifi.scripting import ConverterScript\n";
+
+ public String getScript(File scriptFile) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append(PRELOADS)
+ .append(FileUtils.readFileToString(scriptFile, "UTF-8"));
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java
new file mode 100644
index 0000000..d879722
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.OutputStream;
+
+public interface OutputStreamHandler {
+
+ void write(OutputStream out);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java
new file mode 100644
index 0000000..b1d89c0
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import javax.script.Invocable;
+import javax.script.ScriptException;
+
+import org.apache.nifi.processor.Relationship;
+
+/**
+ * <p>
+ * Script authors should extend this class if they want to follow the "reader"
+ * paradigm for NiFi processors.
+ * </p>
+ *
+ * <p>
+ * User scripts should implement {@link #route(InputStream)}. <code>route</code>
+ * uses a returned relationship name to determine where FlowFiles go. Scripts
+ * may also implement {@link #getProcessorRelationships()} to specify available
+ * relationship names.
+ * </p>
+ *
+ */
+public class ReaderScript extends Script {
+
+ private Object routeCallback;
+
+ public ReaderScript(Object... callbacks) {
+ super(callbacks);
+ for (Object callback : callbacks) {
+ if (callback instanceof Map<?, ?>) {
+ routeCallback = routeCallback == null && ((Map<?, ?>) callback).containsKey("route") ? callback : routeCallback;
+ }
+ }
+ }
+
+ public ReaderScript() {
+
+ }
+
+ // Simple helper
+ public void process(InputStream input) throws NoSuchMethodException, ScriptException {
+ lastRoute = route(input);
+ }
+
+ /**
+ * Subclasses should examine the provided inputstream, then determine which
+ * relationship the file will be sent down and return its name.
+ *
+ *
+ * @param in a Java InputStream containing the incoming FlowFile.
+ * @return a relationship name
+ * @throws ScriptException
+ * @throws NoSuchMethodException
+ */
+ public Relationship route(InputStream in) throws NoSuchMethodException, ScriptException {
+ Relationship relationship = null;
+ Invocable invocable = (Invocable) this.engine;
+ relationship = (Relationship) invocable.invokeMethod(routeCallback, "route", in);
+ return relationship;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java
new file mode 100644
index 0000000..786f541
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.script.Invocable;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.Relationship;
+
+/**
+ * <p>
+ * Base class for all scripts. In this framework, only ScriptEngines that
+ * implement javax.script.Invocable are supported.
+ *
+ * </p>
+ *
+ */
+public class Script {
+
+ public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
+ .name("success")
+ .description("Destination of successfully created flow files")
+ .build();
+ public static final Relationship FAIL_RELATIONSHIP = new Relationship.Builder()
+ .name("failure")
+ .description("Destination of flow files when a error occurs in the script")
+ .build();
+
+ static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ Set<Relationship> rels = new HashSet<>();
+ rels.add(FAIL_RELATIONSHIP);
+ rels.add(SUCCESS_RELATIONSHIP);
+ RELATIONSHIPS = Collections.unmodifiableSet(rels);
+ }
+
+ FlowFile flowFile = null;
+ ScriptEngine engine = null;
+
+ protected Map<String, String> properties = new HashMap<>();
+ protected Relationship lastRoute = SUCCESS_RELATIONSHIP;
+ protected ProcessorLog logger;
+ protected String scriptFileName;
+ protected Map<String, String> attributes = new HashMap<>();
+ protected long flowFileSize = 0;
+ protected long flowFileEntryDate = System.currentTimeMillis();
+
+ // the following are needed due to an inadequate JavaScript ScriptEngine. It will not allow
+ // subclassing a Java Class, only implementing a Java Interface. So, the syntax of JavaScript
+ // scripts looks like subclassing, but actually is just constructing a Script instance and
+ // passing in functions as args to the constructor. When we move to Nashorn JavaScript ScriptEngine
+ // in Java 8, we can get rid of these and revert the subclasses of this class to abstract.
+ protected Object propDescCallback;
+ protected Object relationshipsCallback;
+ protected Object validateCallback;
+ protected Object exceptionRouteCallback;
+
+ /**
+ * Create a Script without any parameters
+ */
+ public Script() {
+ }
+
+ public Script(Object... callbacks) {
+ for (Object callback : callbacks) {
+ if (callback instanceof Map<?, ?>) {
+ propDescCallback = propDescCallback == null && ((Map<?, ?>) callback).containsKey("getPropertyDescriptors") ? callback
+ : propDescCallback;
+ relationshipsCallback = relationshipsCallback == null && ((Map<?, ?>) callback).containsKey("getRelationships") ? callback
+ : relationshipsCallback;
+ validateCallback = validateCallback == null && ((Map<?, ?>) callback).containsKey("validate") ? callback : validateCallback;
+ exceptionRouteCallback = exceptionRouteCallback == null && ((Map<?, ?>) callback).containsKey("getExceptionRoute") ? callback
+ : exceptionRouteCallback;
+ }
+ }
+ }
+
+ /**
+ * Specify a set of properties with corresponding NiFi validators.
+ *
+ * Subclasses that do not override this method will still have access to all
+ * properties via the "properties" field
+ *
+ * @return a list of PropertyDescriptors
+ * @throws ScriptException
+ * @throws NoSuchMethodException
+ */
+ @SuppressWarnings("unchecked")
+ public List<PropertyDescriptor> getPropertyDescriptors() throws NoSuchMethodException, ScriptException {
+ if (propDescCallback != null) {
+ return (List<PropertyDescriptor>) ((Invocable) engine).invokeMethod(propDescCallback, "getPropertyDescriptors", (Object) null);
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Specify a set of reasons why this processor should be invalid.
+ *
+ * Subclasses that do not override this method will depend only on
+ * individual property validators as specified in
+ * {@link #getPropertyDescriptors()}.
+ *
+ * @return a Collection of messages to display to the user, or an empty
+ * Collection if the processor configuration is OK.
+ * @throws ScriptException
+ * @throws NoSuchMethodException
+ */
+ @SuppressWarnings("unchecked")
+ public Collection<String> validate() throws NoSuchMethodException, ScriptException {
+ if (validateCallback != null) {
+ return (Collection<String>) ((Invocable) engine).invokeMethod(validateCallback, "validate", (Object) null);
+ }
+ return Collections.emptyList();
+ }
+
+ void setFlowFile(FlowFile ff) {
+ flowFile = ff;
+ if (null != ff) {
+ // have to clone because ff.getAttributes is unmodifiable
+ this.attributes = new HashMap<>(ff.getAttributes());
+ this.flowFileSize = ff.getSize();
+ this.flowFileEntryDate = ff.getEntryDate();
+ }
+ }
+
+ void setProperties(Map<String, String> map) {
+ properties = new HashMap<>(map);
+ }
+
+ /**
+ * Required to access entire properties map -- Jython (at least) won't let
+ * you read the member variable without a getter
+ *
+ * @return entire parameter map
+ */
+ // change back to protected when we get nashorn
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ /**
+ * Get the named parameter. Some scripting languages make a method call
+ * easier than accessing a member field, so this is a convenience method to
+ * look up values in the properties field.
+ *
+ * @param key a hash key
+ * @return the value pointed at by the key specified
+ */
+ public String getProperty(String key) {
+ return properties.get(key);
+ }
+
+ /**
+ * Name the various relationships by which a file can leave this processor.
+ * Subclasses may override this method to change available relationships.
+ *
+ * @return a collection of relationship names
+ * @throws ScriptException
+ * @throws NoSuchMethodException
+ */
+ @SuppressWarnings("unchecked")
+ public Collection<Relationship> getRelationships() throws NoSuchMethodException, ScriptException {
+ if (relationshipsCallback != null) {
+ return (Collection<Relationship>) ((Invocable) engine).invokeMethod(relationshipsCallback, "getRelationships", (Object) null);
+ }
+ return RELATIONSHIPS;
+ }
+
+ /**
+ * Determine what do with a file that has just been processed.
+ *
+ * After a script runs its "read" or "write" method, it should update the
+ * "lastRoute" field to specify the relationship to which the resulting file
+ * will be sent.
+ *
+ * @return a relationship name
+ */
+ public Relationship getRoute() {
+ return lastRoute;
+ }
+
+ // Required because of a potential issue in Rhino -- protected methods are visible in
+ // subclasses but protected fields (like "lastRoute") are not
+ // change back to protected when we get nashorn
+ public void setRoute(Relationship route) {
+ lastRoute = route;
+ }
+
+ /**
+ * Determine where to send a file if an exception is thrown during
+ * processing.
+ *
+ * Subclasses may override this method to use a different relationship, or
+ * to determine the relationship dynamically. Returning null causes the file
+ * to be deleted instead.
+ *
+ * Defaults to "failure".
+ *
+ * @return the name of the relationship to use in event of an exception, or
+ * null to delete the file.
+ * @throws ScriptException
+ * @throws NoSuchMethodException
+ */
+ public Relationship getExceptionRoute() throws NoSuchMethodException, ScriptException {
+ if (exceptionRouteCallback != null) {
+ return (Relationship) ((Invocable) engine).invokeMethod(exceptionRouteCallback, "getExceptionRoute", (Object) null);
+ }
+ return FAIL_RELATIONSHIP;
+ }
+
+ /*
+ * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to get
+ * the incoming flow file size.
+ */
+ // Change back to protected when we get nashorn
+ public long getFlowFileSize() {
+ return flowFileSize;
+ }
+
+ /*
+ * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to get
+ * entry date of the flow file.
+ */
+ // Change back to protected when we get nashorn
+ public long getFlowFileEntryDate() {
+ return flowFileEntryDate;
+ }
+
+ void setLogger(ProcessorLog logger) {
+ this.logger = logger;
+ }
+
+ /*
+ * Required so that scripts in some languages can read access the attribute. Jython (at least) won't let you read the member
+ * variable without a getter
+ */
+ protected ProcessorLog getLogger() {
+ return this.logger;
+ }
+
+ void setFileName(String scriptFileName) {
+ this.scriptFileName = scriptFileName;
+ }
+
+ public String getFileName() {
+ return this.scriptFileName;
+ }
+
+ // this one's public because it's needed by ExecuteScript to update the flow file's attributes AFTER processing is done
+ public Map<String, String> getAttributes() {
+ return this.attributes;
+ }
+
+ /*
+ * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to look
+ * up values in the attributes field.
+ */
+ // Change back to protected when we get nashorn
+ public String getAttribute(String key) {
+ return this.attributes.get(key);
+ }
+
+ /*
+ * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to set
+ * key/value pairs in the attributes field.
+ */
+ // Change back to protected when we get nashorn
+ public void setAttribute(String key, String value) {
+ this.attributes.put(key, value);
+ }
+
+ void setEngine(ScriptEngine scriptEngine) {
+ this.engine = scriptEngine;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java
new file mode 100644
index 0000000..6f38886
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.jruby.embed.PropertyName;
+
+public class ScriptEngineFactory {
+
+ private static final String THREADING = "THREADING";
+ private static final String MULTITHREADED = "MULTITHREADED";
+ private static final String STATELESS = "STATELESS";
+ private static final String THREAD_ISOLATED = "THREAD-ISOLATED";
+ final static ScriptEngineManager scriptEngMgr;
+
+ static {
+ System.setProperty(PropertyName.LOCALCONTEXT_SCOPE.toString(), "singlethread");
+ System.setProperty(PropertyName.COMPILEMODE.toString(), "jit");
+ System.setProperty(PropertyName.COMPATVERSION.toString(), "JRuby1.9");
+ System.setProperty(PropertyName.LOCALVARIABLE_BEHAVIOR.toString(), "transient");
+ System.setProperty("compile.invokedynamic", "false");
+ System.setProperty(PropertyName.LAZINESS.toString(), "true");
+ scriptEngMgr = new ScriptEngineManager();
+ }
+ final ConcurrentHashMap<String, ScriptEngine> threadSafeEngines = new ConcurrentHashMap<>();
+
+ ScriptEngine getEngine(String extension) {
+ ScriptEngine engine = threadSafeEngines.get(extension);
+ if (null == engine) {
+ engine = scriptEngMgr.getEngineByExtension(extension);
+ if (null == engine) {
+ throw new IllegalArgumentException("No ScriptEngine exists for extension " + extension);
+ }
+
+ Object threading = engine.getFactory().getParameter(THREADING);
+ // the MULTITHREADED status means that the scripts need to be careful about sharing state
+ if (THREAD_ISOLATED.equals(threading) || STATELESS.equals(threading) || MULTITHREADED.equals(threading)) {
+ ScriptEngine cachedEngine = threadSafeEngines.putIfAbsent(extension, engine);
+ if (null != cachedEngine) {
+ engine = cachedEngine;
+ }
+ }
+ }
+ return engine;
+ }
+
+ ScriptEngine getNewEngine(File scriptFile, String extension) throws ScriptException {
+ ScriptEngine engine = scriptEngMgr.getEngineByExtension(extension);
+ if (null == engine) {
+ throw new IllegalArgumentException("No ScriptEngine exists for extension " + extension);
+ }
+ // Initialize some paths
+ StringBuilder sb = new StringBuilder();
+ switch (extension) {
+ case "rb":
+ String parent = scriptFile.getParent();
+ parent = StringUtils.replace(parent, "\\", "/");
+ sb.append("$:.unshift '")
+ .append(parent)
+ .append("'\n")
+ .append("$:.unshift File.join '")
+ .append(parent)
+ .append("', 'lib'\n");
+ engine.eval(sb.toString());
+
+ break;
+ case "py":
+ parent = scriptFile.getParent();
+ parent = StringUtils.replace(parent, "\\", "/");
+ String lib = parent + "/lib";
+ sb.append("import sys\n").append("sys.path.append('").append(parent)
+ .append("')\n").append("sys.path.append('")
+ .append(lib)
+ .append("')\n")
+ .append("__file__ = '")
+ .append(scriptFile.getAbsolutePath())
+ .append("'\n");
+ engine.eval(sb.toString());
+ break;
+ default:
+ break;
+ }
+
+ Object threading = engine.getFactory().getParameter(THREADING);
+ // the MULTITHREADED status means that the scripts need to be careful about sharing state
+ if (THREAD_ISOLATED.equals(threading) || STATELESS.equals(threading) || MULTITHREADED.equals(threading)) {
+ // replace prior instance if any
+ threadSafeEngines.put(extension, engine);
+ }
+ return engine;
+ }
+
+ boolean isThreadSafe(String scriptExtension) {
+ return threadSafeEngines.containsKey(scriptExtension);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java
new file mode 100644
index 0000000..da18606
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import javax.script.Bindings;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import javax.script.SimpleBindings;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.logging.ProcessorLog;
+
+import org.apache.commons.io.FileUtils;
+
+/**
+ * While this is a 'factory', it is not a singleton because we want a factory
+ * per processor. This factory has state, all of which belong to only one
+ * processor.
+ *
+ */
+public class ScriptFactory {
+
+ private final ScriptEngineFactory engineFactory = new ScriptEngineFactory();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReadLock readLock = lock.readLock();
+ private final WriteLock writeLock = lock.writeLock();
+ private final ProcessorLog logger;
+
+ private volatile CompiledScript compiledScript;
+ private volatile String scriptText;
+ private volatile byte[] md5Hash;
+ private volatile long lastTimeChecked;
+ private volatile String scriptFileName;
+ private volatile long scriptCheckIntervalMS = 15000;
+
+ public ScriptFactory(ProcessorLog logger) {
+ this.logger = logger;
+ }
+
+ public void setScriptCheckIntervalMS(long msecs) {
+ this.scriptCheckIntervalMS = msecs;
+ }
+
+ /**
+ * @param aScriptFileName
+ * @param properties
+ * @param flowFile
+ * @return
+ * @throws IOException
+ * @throws ScriptException
+ */
+ public Script getScript(final String aScriptFileName, final Map<String, String> properties, final FlowFile flowFile)
+ throws IOException, ScriptException {
+ final Script instance;
+ long now = System.currentTimeMillis();
+ readLock.lock();
+ try {
+ if (!aScriptFileName.equals(this.scriptFileName)) {
+ readLock.unlock();
+ writeLock.lock();
+ try {
+ if (!aScriptFileName.equals(this.scriptFileName)) {
+ // need to get brand new engine
+ compiledScript = null;
+ this.md5Hash = getMD5Hash(aScriptFileName);
+ this.lastTimeChecked = now;
+ this.scriptFileName = aScriptFileName;
+ updateEngine();
+ } // else another thread beat me to the change...so just get a script
+ } finally {
+ readLock.lock();
+ writeLock.unlock();
+ }
+ } else if (lastTimeChecked + scriptCheckIntervalMS < now) {
+ readLock.unlock();
+ writeLock.lock();
+ try {
+ if (lastTimeChecked + scriptCheckIntervalMS < now) {
+ byte[] md5 = getMD5Hash(this.scriptFileName);
+ if (!MessageDigest.isEqual(md5Hash, md5)) {
+ // need to get brand new engine
+ compiledScript = null;
+ updateEngine();
+ this.md5Hash = md5;
+ } // else no change to script, so just update time checked
+ this.lastTimeChecked = now;
+ } // else another thread beat me to the check...so just get a script
+ } finally {
+ readLock.lock();
+ writeLock.unlock();
+ }
+ }
+ try {
+ instance = getScriptInstance(properties);
+ instance.setFileName(this.scriptFileName);
+ instance.setProperties(properties);
+ instance.setLogger(logger);
+ instance.setFlowFile(flowFile);
+ } catch (ScriptException e) {
+ // need to reset state to enable re-initialization
+ this.lastTimeChecked = 0;
+ this.scriptFileName = null;
+ throw e;
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ return instance;
+
+ }
+
+ public Script getScript(String aScriptFileName) throws ScriptException, IOException {
+ Map<String, String> props = new HashMap<>();
+ return getScript(aScriptFileName, props, null);
+ }
+
+ private byte[] getMD5Hash(String aScriptFileName) throws FileNotFoundException, IOException {
+ byte[] messageDigest = null;
+ try (FileInputStream fis = new FileInputStream(aScriptFileName);
+ DigestInputStream dis = new DigestInputStream(new BufferedInputStream(fis), MessageDigest.getInstance("MD5"))) {
+
+ byte[] bytes = new byte[8192];
+ while (dis.read(bytes) != -1) {
+ // do nothing...just computing the md5 hash
+ }
+ messageDigest = dis.getMessageDigest().digest();
+ } catch (NoSuchAlgorithmException swallow) {
+ // MD5 is a legitimate format
+ }
+ return messageDigest;
+ }
+
+ private String getScriptText(File scriptFile, String extension) throws IOException {
+ final String script;
+ switch (extension) {
+ case "rb":
+ script = JRubyScriptFactory.INSTANCE.getScript(scriptFile);
+ break;
+
+ case "js":
+ script = JavaScriptScriptFactory.INSTANCE.getScript(scriptFile);
+ break;
+
+ case "py":
+ script = JythonScriptFactory.INSTANCE.getScript(scriptFile);
+ break;
+
+ default:
+ script = FileUtils.readFileToString(scriptFile);
+ }
+ return script;
+ }
+
+ private Script getScriptInstance(final Map<String, String> properties) throws ScriptException {
+
+ Map<String, Object> localThreadVariables = new HashMap<>();
+ final String extension = getExtension(scriptFileName);
+ String loggerVariableKey = getVariableName("GLOBAL", "logger", extension);
+ localThreadVariables.put(loggerVariableKey, logger);
+ String propertiesVariableKey = getVariableName("INSTANCE", "properties", extension);
+ localThreadVariables.put(propertiesVariableKey, properties);
+ localThreadVariables.put(ScriptEngine.FILENAME, scriptFileName);
+ final Bindings bindings = new SimpleBindings(localThreadVariables);
+ final ScriptEngine scriptEngine = engineFactory.getEngine(extension);
+ Script instance;
+ if (compiledScript == null) {
+ instance = (Script) scriptEngine.eval(scriptText, bindings);
+ if (instance == null) { // which it will be for python and also for local variables in javascript
+ instance = (Script) scriptEngine.eval("instance", bindings);
+ }
+ } else {
+ instance = (Script) compiledScript.eval(bindings);
+ if (instance == null) { // which it will be for python and also for local variables in javascript
+ instance = (Script) compiledScript.getEngine().eval("instance", bindings);
+ }
+ }
+ instance.setEngine(scriptEngine);
+ return instance;
+ }
+
+ /*
+ * Must have writeLock when calling this!!!!
+ */
+ private void updateEngine() throws IOException, ScriptException {
+ final String extension = getExtension(scriptFileName);
+ // if engine is thread safe, it's being reused...if it's a JrubyEngine it
+ File scriptFile = new File(this.scriptFileName);
+ ScriptEngine scriptEngine = engineFactory.getNewEngine(scriptFile, extension);
+ scriptText = getScriptText(scriptFile, extension);
+ Map<String, Object> localThreadVariables = new HashMap<>();
+ String loggerVariableKey = getVariableName("GLOBAL", "logger", extension);
+ localThreadVariables.put(loggerVariableKey, logger);
+ String propertiesVariableKey = getVariableName("INSTANCE", "properties", extension);
+ localThreadVariables.put(propertiesVariableKey, new HashMap<String, String>());
+ localThreadVariables.put(ScriptEngine.FILENAME, scriptFileName);
+ if (scriptEngine instanceof Compilable) {
+ Bindings bindings = new SimpleBindings(localThreadVariables);
+ scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
+ compiledScript = ((Compilable) scriptEngine).compile(scriptText);
+ }
+ logger.debug("Updating Engine!!");
+ }
+
+ private String getVariableName(String scope, String variableName, String extension) {
+ String result;
+ switch (extension) {
+ case "rb":
+ switch (scope) {
+ case "GLOBAL":
+ result = '$' + variableName;
+ break;
+ case "INSTANCE":
+ result = '@' + variableName;
+ break;
+ default:
+ result = variableName;
+ break;
+ }
+
+ break;
+
+ default:
+ result = variableName;
+ break;
+ }
+ return result;
+ }
+
+ private String getExtension(String aScriptFileName) {
+ int dotPos = aScriptFileName.lastIndexOf('.');
+ if (dotPos < 1) {
+ throw new IllegalArgumentException("Script file name must have an extension");
+ }
+ final String extension = aScriptFileName.substring(dotPos + 1);
+ return extension;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java
new file mode 100644
index 0000000..7eef98b
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scripting;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import javax.script.Invocable;
+import javax.script.ScriptException;
+
+/**
+ * <p>
+ * Script authors should extend this class if they want to follow the
+ * "processCallback" paradigm for NiFi processors.
+ * </p>
+ *
+ * <p>
+ * At a minimum, scripts must implement
+ * <code>process(FileInputStream, FileOutputStream)</code>.
+ * </p>
+ *
+ * <p>
+ * By default, all files processed will be sent to the relationship
+ * <em>success</em>, unless the scriptFileName raises an exception, in which
+ * case the file will be sent to <em>failure</em>. Implement
+ * {@link #getProcessorRelationships()} and/or {@link #getRoute()} to change
+ * this behavior.
+ * </p>
+ *
+ */
+public class WriterScript extends Script {
+
+ private Object processCallback;
+
+ public WriterScript() {
+
+ }
+
+ public WriterScript(Object... callbacks) {
+ super(callbacks);
+ for (Object callback : callbacks) {
+ if (callback instanceof Map<?, ?>) {
+ processCallback = processCallback == null && ((Map<?, ?>) callback).containsKey("process") ? callback : processCallback;
+ }
+ }
+ }
+
+ public void process(InputStream in, OutputStream out) throws NoSuchMethodException, ScriptException {
+ Invocable inv = (Invocable) engine;
+ inv.invokeMethod(processCallback, "process", in, out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..20a3982
--- /dev/null
+++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.script.ExecuteScript