You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/19 19:16:07 UTC
[33/59] [abbrv] [partial] incubator-nifi git commit: Reworked overall
directory structure to make releasing nifi vs maven plugis easier
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java
deleted file mode 100644
index d879722..0000000
--- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.scripting;
-
-import java.io.OutputStream;
-
-public interface OutputStreamHandler {
-
- void write(OutputStream out);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java
deleted file mode 100644
index b1d89c0..0000000
--- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.scripting;
-
-import java.io.InputStream;
-import java.util.Map;
-
-import javax.script.Invocable;
-import javax.script.ScriptException;
-
-import org.apache.nifi.processor.Relationship;
-
-/**
- * <p>
- * Script authors should extend this class if they want to follow the "reader"
- * paradigm for NiFi processors.
- * </p>
- *
- * <p>
- * User scripts should implement {@link #route(InputStream)}. <code>route</code>
- * uses a returned relationship name to determine where FlowFiles go. Scripts
- * may also implement {@link #getProcessorRelationships()} to specify available
- * relationship names.
- * </p>
- *
- */
-public class ReaderScript extends Script {
-
- private Object routeCallback;
-
- public ReaderScript(Object... callbacks) {
- super(callbacks);
- for (Object callback : callbacks) {
- if (callback instanceof Map<?, ?>) {
- routeCallback = routeCallback == null && ((Map<?, ?>) callback).containsKey("route") ? callback : routeCallback;
- }
- }
- }
-
- public ReaderScript() {
-
- }
-
- // Simple helper
- public void process(InputStream input) throws NoSuchMethodException, ScriptException {
- lastRoute = route(input);
- }
-
- /**
- * Subclasses should examine the provided inputstream, then determine which
- * relationship the file will be sent down and return its name.
- *
- *
- * @param in a Java InputStream containing the incoming FlowFile.
- * @return a relationship name
- * @throws ScriptException
- * @throws NoSuchMethodException
- */
- public Relationship route(InputStream in) throws NoSuchMethodException, ScriptException {
- Relationship relationship = null;
- Invocable invocable = (Invocable) this.engine;
- relationship = (Relationship) invocable.invokeMethod(routeCallback, "route", in);
- return relationship;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java
deleted file mode 100644
index 786f541..0000000
--- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.scripting;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.script.Invocable;
-import javax.script.ScriptEngine;
-import javax.script.ScriptException;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.Relationship;
-
-/**
- * <p>
- * Base class for all scripts. In this framework, only ScriptEngines that
- * implement javax.script.Invocable are supported.
- *
- * </p>
- *
- */
-public class Script {
-
- public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
- .name("success")
- .description("Destination of successfully created flow files")
- .build();
- public static final Relationship FAIL_RELATIONSHIP = new Relationship.Builder()
- .name("failure")
- .description("Destination of flow files when a error occurs in the script")
- .build();
-
- static final Set<Relationship> RELATIONSHIPS;
-
- static {
- Set<Relationship> rels = new HashSet<>();
- rels.add(FAIL_RELATIONSHIP);
- rels.add(SUCCESS_RELATIONSHIP);
- RELATIONSHIPS = Collections.unmodifiableSet(rels);
- }
-
- FlowFile flowFile = null;
- ScriptEngine engine = null;
-
- protected Map<String, String> properties = new HashMap<>();
- protected Relationship lastRoute = SUCCESS_RELATIONSHIP;
- protected ProcessorLog logger;
- protected String scriptFileName;
- protected Map<String, String> attributes = new HashMap<>();
- protected long flowFileSize = 0;
- protected long flowFileEntryDate = System.currentTimeMillis();
-
- // the following are needed due to an inadequate JavaScript ScriptEngine. It will not allow
- // subclassing a Java Class, only implementing a Java Interface. So, the syntax of JavaScript
- // scripts looks like subclassing, but actually is just constructing a Script instance and
- // passing in functions as args to the constructor. When we move to Nashorn JavaScript ScriptEngine
- // in Java 8, we can get rid of these and revert the subclasses of this class to abstract.
- protected Object propDescCallback;
- protected Object relationshipsCallback;
- protected Object validateCallback;
- protected Object exceptionRouteCallback;
-
- /**
- * Create a Script without any parameters
- */
- public Script() {
- }
-
- public Script(Object... callbacks) {
- for (Object callback : callbacks) {
- if (callback instanceof Map<?, ?>) {
- propDescCallback = propDescCallback == null && ((Map<?, ?>) callback).containsKey("getPropertyDescriptors") ? callback
- : propDescCallback;
- relationshipsCallback = relationshipsCallback == null && ((Map<?, ?>) callback).containsKey("getRelationships") ? callback
- : relationshipsCallback;
- validateCallback = validateCallback == null && ((Map<?, ?>) callback).containsKey("validate") ? callback : validateCallback;
- exceptionRouteCallback = exceptionRouteCallback == null && ((Map<?, ?>) callback).containsKey("getExceptionRoute") ? callback
- : exceptionRouteCallback;
- }
- }
- }
-
- /**
- * Specify a set of properties with corresponding NiFi validators.
- *
- * Subclasses that do not override this method will still have access to all
- * properties via the "properties" field
- *
- * @return a list of PropertyDescriptors
- * @throws ScriptException
- * @throws NoSuchMethodException
- */
- @SuppressWarnings("unchecked")
- public List<PropertyDescriptor> getPropertyDescriptors() throws NoSuchMethodException, ScriptException {
- if (propDescCallback != null) {
- return (List<PropertyDescriptor>) ((Invocable) engine).invokeMethod(propDescCallback, "getPropertyDescriptors", (Object) null);
- }
- return Collections.emptyList();
- }
-
- /**
- * Specify a set of reasons why this processor should be invalid.
- *
- * Subclasses that do not override this method will depend only on
- * individual property validators as specified in
- * {@link #getPropertyDescriptors()}.
- *
- * @return a Collection of messages to display to the user, or an empty
- * Collection if the processor configuration is OK.
- * @throws ScriptException
- * @throws NoSuchMethodException
- */
- @SuppressWarnings("unchecked")
- public Collection<String> validate() throws NoSuchMethodException, ScriptException {
- if (validateCallback != null) {
- return (Collection<String>) ((Invocable) engine).invokeMethod(validateCallback, "validate", (Object) null);
- }
- return Collections.emptyList();
- }
-
- void setFlowFile(FlowFile ff) {
- flowFile = ff;
- if (null != ff) {
- // have to clone because ff.getAttributes is unmodifiable
- this.attributes = new HashMap<>(ff.getAttributes());
- this.flowFileSize = ff.getSize();
- this.flowFileEntryDate = ff.getEntryDate();
- }
- }
-
- void setProperties(Map<String, String> map) {
- properties = new HashMap<>(map);
- }
-
- /**
- * Required to access entire properties map -- Jython (at least) won't let
- * you read the member variable without a getter
- *
- * @return entire parameter map
- */
- // change back to protected when we get nashorn
- public Map<String, String> getProperties() {
- return properties;
- }
-
- /**
- * Get the named parameter. Some scripting languages make a method call
- * easier than accessing a member field, so this is a convenience method to
- * look up values in the properties field.
- *
- * @param key a hash key
- * @return the value pointed at by the key specified
- */
- public String getProperty(String key) {
- return properties.get(key);
- }
-
- /**
- * Name the various relationships by which a file can leave this processor.
- * Subclasses may override this method to change available relationships.
- *
- * @return a collection of relationship names
- * @throws ScriptException
- * @throws NoSuchMethodException
- */
- @SuppressWarnings("unchecked")
- public Collection<Relationship> getRelationships() throws NoSuchMethodException, ScriptException {
- if (relationshipsCallback != null) {
- return (Collection<Relationship>) ((Invocable) engine).invokeMethod(relationshipsCallback, "getRelationships", (Object) null);
- }
- return RELATIONSHIPS;
- }
-
- /**
- * Determine what do with a file that has just been processed.
- *
- * After a script runs its "read" or "write" method, it should update the
- * "lastRoute" field to specify the relationship to which the resulting file
- * will be sent.
- *
- * @return a relationship name
- */
- public Relationship getRoute() {
- return lastRoute;
- }
-
- // Required because of a potential issue in Rhino -- protected methods are visible in
- // subclasses but protected fields (like "lastRoute") are not
- // change back to protected when we get nashorn
- public void setRoute(Relationship route) {
- lastRoute = route;
- }
-
- /**
- * Determine where to send a file if an exception is thrown during
- * processing.
- *
- * Subclasses may override this method to use a different relationship, or
- * to determine the relationship dynamically. Returning null causes the file
- * to be deleted instead.
- *
- * Defaults to "failure".
- *
- * @return the name of the relationship to use in event of an exception, or
- * null to delete the file.
- * @throws ScriptException
- * @throws NoSuchMethodException
- */
- public Relationship getExceptionRoute() throws NoSuchMethodException, ScriptException {
- if (exceptionRouteCallback != null) {
- return (Relationship) ((Invocable) engine).invokeMethod(exceptionRouteCallback, "getExceptionRoute", (Object) null);
- }
- return FAIL_RELATIONSHIP;
- }
-
- /*
- * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to get
- * the incoming flow file size.
- */
- // Change back to protected when we get nashorn
- public long getFlowFileSize() {
- return flowFileSize;
- }
-
- /*
- * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to get
- * entry date of the flow file.
- */
- // Change back to protected when we get nashorn
- public long getFlowFileEntryDate() {
- return flowFileEntryDate;
- }
-
- void setLogger(ProcessorLog logger) {
- this.logger = logger;
- }
-
- /*
- * Required so that scripts in some languages can read access the attribute. Jython (at least) won't let you read the member
- * variable without a getter
- */
- protected ProcessorLog getLogger() {
- return this.logger;
- }
-
- void setFileName(String scriptFileName) {
- this.scriptFileName = scriptFileName;
- }
-
- public String getFileName() {
- return this.scriptFileName;
- }
-
- // this one's public because it's needed by ExecuteScript to update the flow file's attributes AFTER processing is done
- public Map<String, String> getAttributes() {
- return this.attributes;
- }
-
- /*
- * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to look
- * up values in the attributes field.
- */
- // Change back to protected when we get nashorn
- public String getAttribute(String key) {
- return this.attributes.get(key);
- }
-
- /*
- * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to set
- * key/value pairs in the attributes field.
- */
- // Change back to protected when we get nashorn
- public void setAttribute(String key, String value) {
- this.attributes.put(key, value);
- }
-
- void setEngine(ScriptEngine scriptEngine) {
- this.engine = scriptEngine;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java
deleted file mode 100644
index 6f38886..0000000
--- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.scripting;
-
-import java.io.File;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.script.ScriptEngine;
-import javax.script.ScriptEngineManager;
-import javax.script.ScriptException;
-
-import org.apache.commons.lang3.StringUtils;
-import org.jruby.embed.PropertyName;
-
-public class ScriptEngineFactory {
-
- private static final String THREADING = "THREADING";
- private static final String MULTITHREADED = "MULTITHREADED";
- private static final String STATELESS = "STATELESS";
- private static final String THREAD_ISOLATED = "THREAD-ISOLATED";
- final static ScriptEngineManager scriptEngMgr;
-
- static {
- System.setProperty(PropertyName.LOCALCONTEXT_SCOPE.toString(), "singlethread");
- System.setProperty(PropertyName.COMPILEMODE.toString(), "jit");
- System.setProperty(PropertyName.COMPATVERSION.toString(), "JRuby1.9");
- System.setProperty(PropertyName.LOCALVARIABLE_BEHAVIOR.toString(), "transient");
- System.setProperty("compile.invokedynamic", "false");
- System.setProperty(PropertyName.LAZINESS.toString(), "true");
- scriptEngMgr = new ScriptEngineManager();
- }
- final ConcurrentHashMap<String, ScriptEngine> threadSafeEngines = new ConcurrentHashMap<>();
-
- ScriptEngine getEngine(String extension) {
- ScriptEngine engine = threadSafeEngines.get(extension);
- if (null == engine) {
- engine = scriptEngMgr.getEngineByExtension(extension);
- if (null == engine) {
- throw new IllegalArgumentException("No ScriptEngine exists for extension " + extension);
- }
-
- Object threading = engine.getFactory().getParameter(THREADING);
- // the MULTITHREADED status means that the scripts need to be careful about sharing state
- if (THREAD_ISOLATED.equals(threading) || STATELESS.equals(threading) || MULTITHREADED.equals(threading)) {
- ScriptEngine cachedEngine = threadSafeEngines.putIfAbsent(extension, engine);
- if (null != cachedEngine) {
- engine = cachedEngine;
- }
- }
- }
- return engine;
- }
-
- ScriptEngine getNewEngine(File scriptFile, String extension) throws ScriptException {
- ScriptEngine engine = scriptEngMgr.getEngineByExtension(extension);
- if (null == engine) {
- throw new IllegalArgumentException("No ScriptEngine exists for extension " + extension);
- }
- // Initialize some paths
- StringBuilder sb = new StringBuilder();
- switch (extension) {
- case "rb":
- String parent = scriptFile.getParent();
- parent = StringUtils.replace(parent, "\\", "/");
- sb.append("$:.unshift '")
- .append(parent)
- .append("'\n")
- .append("$:.unshift File.join '")
- .append(parent)
- .append("', 'lib'\n");
- engine.eval(sb.toString());
-
- break;
- case "py":
- parent = scriptFile.getParent();
- parent = StringUtils.replace(parent, "\\", "/");
- String lib = parent + "/lib";
- sb.append("import sys\n").append("sys.path.append('").append(parent)
- .append("')\n").append("sys.path.append('")
- .append(lib)
- .append("')\n")
- .append("__file__ = '")
- .append(scriptFile.getAbsolutePath())
- .append("'\n");
- engine.eval(sb.toString());
- break;
- default:
- break;
- }
-
- Object threading = engine.getFactory().getParameter(THREADING);
- // the MULTITHREADED status means that the scripts need to be careful about sharing state
- if (THREAD_ISOLATED.equals(threading) || STATELESS.equals(threading) || MULTITHREADED.equals(threading)) {
- // replace prior instance if any
- threadSafeEngines.put(extension, engine);
- }
- return engine;
- }
-
- boolean isThreadSafe(String scriptExtension) {
- return threadSafeEngines.containsKey(scriptExtension);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java
deleted file mode 100644
index da18606..0000000
--- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.scripting;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.security.DigestInputStream;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
-import javax.script.Bindings;
-import javax.script.Compilable;
-import javax.script.CompiledScript;
-import javax.script.ScriptContext;
-import javax.script.ScriptEngine;
-import javax.script.ScriptException;
-import javax.script.SimpleBindings;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.io.BufferedInputStream;
-import org.apache.nifi.logging.ProcessorLog;
-
-import org.apache.commons.io.FileUtils;
-
-/**
- * While this is a 'factory', it is not a singleton because we want a factory
- * per processor. This factory has state, all of which belong to only one
- * processor.
- *
- */
-public class ScriptFactory {
-
- private final ScriptEngineFactory engineFactory = new ScriptEngineFactory();
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final ReadLock readLock = lock.readLock();
- private final WriteLock writeLock = lock.writeLock();
- private final ProcessorLog logger;
-
- private volatile CompiledScript compiledScript;
- private volatile String scriptText;
- private volatile byte[] md5Hash;
- private volatile long lastTimeChecked;
- private volatile String scriptFileName;
- private volatile long scriptCheckIntervalMS = 15000;
-
- public ScriptFactory(ProcessorLog logger) {
- this.logger = logger;
- }
-
- public void setScriptCheckIntervalMS(long msecs) {
- this.scriptCheckIntervalMS = msecs;
- }
-
- /**
- * @param aScriptFileName
- * @param properties
- * @param flowFile
- * @return
- * @throws IOException
- * @throws ScriptException
- */
- public Script getScript(final String aScriptFileName, final Map<String, String> properties, final FlowFile flowFile)
- throws IOException, ScriptException {
- final Script instance;
- long now = System.currentTimeMillis();
- readLock.lock();
- try {
- if (!aScriptFileName.equals(this.scriptFileName)) {
- readLock.unlock();
- writeLock.lock();
- try {
- if (!aScriptFileName.equals(this.scriptFileName)) {
- // need to get brand new engine
- compiledScript = null;
- this.md5Hash = getMD5Hash(aScriptFileName);
- this.lastTimeChecked = now;
- this.scriptFileName = aScriptFileName;
- updateEngine();
- } // else another thread beat me to the change...so just get a script
- } finally {
- readLock.lock();
- writeLock.unlock();
- }
- } else if (lastTimeChecked + scriptCheckIntervalMS < now) {
- readLock.unlock();
- writeLock.lock();
- try {
- if (lastTimeChecked + scriptCheckIntervalMS < now) {
- byte[] md5 = getMD5Hash(this.scriptFileName);
- if (!MessageDigest.isEqual(md5Hash, md5)) {
- // need to get brand new engine
- compiledScript = null;
- updateEngine();
- this.md5Hash = md5;
- } // else no change to script, so just update time checked
- this.lastTimeChecked = now;
- } // else another thread beat me to the check...so just get a script
- } finally {
- readLock.lock();
- writeLock.unlock();
- }
- }
- try {
- instance = getScriptInstance(properties);
- instance.setFileName(this.scriptFileName);
- instance.setProperties(properties);
- instance.setLogger(logger);
- instance.setFlowFile(flowFile);
- } catch (ScriptException e) {
- // need to reset state to enable re-initialization
- this.lastTimeChecked = 0;
- this.scriptFileName = null;
- throw e;
- }
- } finally {
- readLock.unlock();
- }
-
- return instance;
-
- }
-
- public Script getScript(String aScriptFileName) throws ScriptException, IOException {
- Map<String, String> props = new HashMap<>();
- return getScript(aScriptFileName, props, null);
- }
-
- private byte[] getMD5Hash(String aScriptFileName) throws FileNotFoundException, IOException {
- byte[] messageDigest = null;
- try (FileInputStream fis = new FileInputStream(aScriptFileName);
- DigestInputStream dis = new DigestInputStream(new BufferedInputStream(fis), MessageDigest.getInstance("MD5"))) {
-
- byte[] bytes = new byte[8192];
- while (dis.read(bytes) != -1) {
- // do nothing...just computing the md5 hash
- }
- messageDigest = dis.getMessageDigest().digest();
- } catch (NoSuchAlgorithmException swallow) {
- // MD5 is a legitimate format
- }
- return messageDigest;
- }
-
- private String getScriptText(File scriptFile, String extension) throws IOException {
- final String script;
- switch (extension) {
- case "rb":
- script = JRubyScriptFactory.INSTANCE.getScript(scriptFile);
- break;
-
- case "js":
- script = JavaScriptScriptFactory.INSTANCE.getScript(scriptFile);
- break;
-
- case "py":
- script = JythonScriptFactory.INSTANCE.getScript(scriptFile);
- break;
-
- default:
- script = FileUtils.readFileToString(scriptFile);
- }
- return script;
- }
-
- private Script getScriptInstance(final Map<String, String> properties) throws ScriptException {
-
- Map<String, Object> localThreadVariables = new HashMap<>();
- final String extension = getExtension(scriptFileName);
- String loggerVariableKey = getVariableName("GLOBAL", "logger", extension);
- localThreadVariables.put(loggerVariableKey, logger);
- String propertiesVariableKey = getVariableName("INSTANCE", "properties", extension);
- localThreadVariables.put(propertiesVariableKey, properties);
- localThreadVariables.put(ScriptEngine.FILENAME, scriptFileName);
- final Bindings bindings = new SimpleBindings(localThreadVariables);
- final ScriptEngine scriptEngine = engineFactory.getEngine(extension);
- Script instance;
- if (compiledScript == null) {
- instance = (Script) scriptEngine.eval(scriptText, bindings);
- if (instance == null) { // which it will be for python and also for local variables in javascript
- instance = (Script) scriptEngine.eval("instance", bindings);
- }
- } else {
- instance = (Script) compiledScript.eval(bindings);
- if (instance == null) { // which it will be for python and also for local variables in javascript
- instance = (Script) compiledScript.getEngine().eval("instance", bindings);
- }
- }
- instance.setEngine(scriptEngine);
- return instance;
- }
-
- /*
- * Must have writeLock when calling this!!!!
- */
- private void updateEngine() throws IOException, ScriptException {
- final String extension = getExtension(scriptFileName);
- // if engine is thread safe, it's being reused...if it's a JrubyEngine it
- File scriptFile = new File(this.scriptFileName);
- ScriptEngine scriptEngine = engineFactory.getNewEngine(scriptFile, extension);
- scriptText = getScriptText(scriptFile, extension);
- Map<String, Object> localThreadVariables = new HashMap<>();
- String loggerVariableKey = getVariableName("GLOBAL", "logger", extension);
- localThreadVariables.put(loggerVariableKey, logger);
- String propertiesVariableKey = getVariableName("INSTANCE", "properties", extension);
- localThreadVariables.put(propertiesVariableKey, new HashMap<String, String>());
- localThreadVariables.put(ScriptEngine.FILENAME, scriptFileName);
- if (scriptEngine instanceof Compilable) {
- Bindings bindings = new SimpleBindings(localThreadVariables);
- scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
- compiledScript = ((Compilable) scriptEngine).compile(scriptText);
- }
- logger.debug("Updating Engine!!");
- }
-
- private String getVariableName(String scope, String variableName, String extension) {
- String result;
- switch (extension) {
- case "rb":
- switch (scope) {
- case "GLOBAL":
- result = '$' + variableName;
- break;
- case "INSTANCE":
- result = '@' + variableName;
- break;
- default:
- result = variableName;
- break;
- }
-
- break;
-
- default:
- result = variableName;
- break;
- }
- return result;
- }
-
- private String getExtension(String aScriptFileName) {
- int dotPos = aScriptFileName.lastIndexOf('.');
- if (dotPos < 1) {
- throw new IllegalArgumentException("Script file name must have an extension");
- }
- final String extension = aScriptFileName.substring(dotPos + 1);
- return extension;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java
deleted file mode 100644
index 7eef98b..0000000
--- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.scripting;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-import javax.script.Invocable;
-import javax.script.ScriptException;
-
-/**
- * <p>
- * Script authors should extend this class if they want to follow the
- * "processCallback" paradigm for NiFi processors.
- * </p>
- *
- * <p>
- * At a minimum, scripts must implement
- * <code>process(FileInputStream, FileOutputStream)</code>.
- * </p>
- *
- * <p>
- * By default, all files processed will be sent to the relationship
- * <em>success</em>, unless the scriptFileName raises an exception, in which
- * case the file will be sent to <em>failure</em>. Implement
- * {@link #getProcessorRelationships()} and/or {@link #getRoute()} to change
- * this behavior.
- * </p>
- *
- */
-public class WriterScript extends Script {
-
- private Object processCallback;
-
- public WriterScript() {
-
- }
-
- public WriterScript(Object... callbacks) {
- super(callbacks);
- for (Object callback : callbacks) {
- if (callback instanceof Map<?, ?>) {
- processCallback = processCallback == null && ((Map<?, ?>) callback).containsKey("process") ? callback : processCallback;
- }
- }
- }
-
- public void process(InputStream in, OutputStream out) throws NoSuchMethodException, ScriptException {
- Invocable inv = (Invocable) engine;
- inv.invokeMethod(processCallback, "process", in, out);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
deleted file mode 100644
index 20a3982..0000000
--- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ /dev/null
@@ -1,15 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.processors.script.ExecuteScript
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/docs/org.apache.nifi.processors.script.ExecuteScript/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/docs/org.apache.nifi.processors.script.ExecuteScript/index.html b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/docs/org.apache.nifi.processors.script.ExecuteScript/index.html
deleted file mode 100644
index acb47c5..0000000
--- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/docs/org.apache.nifi.processors.script.ExecuteScript/index.html
+++ /dev/null
@@ -1,264 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
- <!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
- <head>
- <meta charset="utf-8" />
- <title>ExecuteScript</title>
-
- <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
- </head>
-
- <body>
- <!-- Processor Documentation ================================================== -->
- <h2>Description:</h2>
- <p>
- This processor provides the capability to execute scripts in various scripting languages, and passes into the scripts
- the input stream and output stream(s) representing an incoming flow file and any created flow files. The processor is designed to be
- thread safe, so multiple concurrent tasks may execute against a single script. The processor provides a framework which enables
- script writers to implement 3 different types of scripts:
- <ul>
- ReaderScript - which enables stream-based reading of a FlowFile's content</br>
- WriterScript - which enables stream-based reading and writing/modifying of a FlowFile's content</br>
- ConverterScript - which enables stream-based reading a FlowFile's content and stream-based writing to newly created FlowFiles</br>
- </ul>
- Presently, the processor supports 3 scripting languages: Ruby, Python, and JavaScript. The processor is built on the
- javax.script API which enables ScriptEngine discovery, thread management, and encapsulates much of the low level bridging-code that
- enables Java to Script language integration. Thus, it is designed to be easily extended to other scripting languages. </br>
- The attributes of a FlowFile and properties of the Processor are exposed to the script by either a variable in the base class or
- a getter method. A script may declare new Processor Properties and different Relationships via overriding the getPropertyDescriptors
- and getRelationships methods, respectively.
- </p>
- The processor provides some boilerplate script to aid in the creation of the three different types of scripts. For example,
- the processor provides import statements for classes commonly used within a processor.
- <pre>
- 'org.apache.nifi.components.PropertyDescriptor'
- 'org.apache.nifi.components.Validator'
- 'org.apache.nifi.processor.util.StandardValidators'
- 'org.apache.nifi.processor.Relationship'
- 'org.apache.nifi.logging.ProcessorLog'
- 'org.apache.nifi.scripting.ReaderScript'
- 'org.apache.nifi.scripting.WriterScript'
- 'org.apache.nifi.scripting.ConverterScript'
- </pre>
- The processor appends to the script's execution path the parent directory of the specified script file and a sub-directory
- called 'lib', which may be useful for supporting scripts. </p>
-<p>
- <strong>Shared Variables</strong>
-</p>
-The following variables are provided as shared variables for the scripts:
-<ul>
- <li>logger
- <ul>
- <li> The processor's logger </li>
- <li> Scope is GLOBAL, thus in Ruby the syntax is $logger</li>
- </ul>
- </li>
- <li>properties
- <ul>
- <li> A Map of the processor's configuration properties; key and value are strings</li>
- <li> Scope is INSTANCE, thus in Ruby the syntax is @properties</li>
- </ul>
- </li>
-</ul>
-<p>
- <strong>Properties:</strong>
-</p>
-<p>
- In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered
- optional. If a property has a default value, it is indicated. If a property supports the use of the NiFi Expression Language
- (or simply, "expression language"), that is also indicated. Of particular note: This processor allows scripts to define additional
- Processor properties, which will not be initially visible. Once the processor's configuration is validated, script defined properties
- will become visible, and may affect the validity of the processor.
-</p>
-<ul>
- <li>
- <strong>Script File Name</strong>
- <ul>
- <li>Script location, can be relative or absolute path.</li>
- <li>Default value: no default</li>
- <li>Supports expression language: false</li>
- </ul>
- </li>
- <li>
- <strong>Script Check Interval</strong>
- <ul>
- <li>The time period between checking for updates to a script.</li>
- <li>Default value: 15 sec</li>
- <li>Supports expression language: false</li>
- </ul>
- </li>
-</ul>
-
-<p>
- <strong>Relationships:</strong>
-</p>
-<p>
- The initial 'out of the box' relationships are below. Of particular note is the ability of a script to change the set of
- relationships. However, any relationships defined by the script will not be visible until the processor's configuration has been
- validated. Once done, new relationships will become visible.
-</p>
-<ul>
- <li>
- success
- <ul>
- <li>Used when a file is successfully processed by a script.</li>
- </ul>
- </li>
- <li>
- failure
- <ul>
- <li>Used when an error occurs while processing a file with a script.</li>
- </ul>
- </li>
-</ul>
-
-<p>
- <strong>Example Scripts:</strong>
-</p>
-<ul>
- JavaScript example - the 'with' statement imports packages defined in the framework and limits the importing to the local scope,
- rather than global. The 'Scripting' variable uses the JavaImporter class within JavaScript. Since the 'instance' variable is intended to
- be local scope (not global), it must be named 'instance' as it it not passed back to the processor upon script evaluation and must be
- fetched. If you make it global, you can name it whatever you'd like...but this is intended to be multi-threaded so do so at your own
- risk.</p>
-Presently, there are issues with the JavaScript scripting engine that prevent sub-classing the base classes in the Processor's Java
-framework. So, what is actually happening is an instance of the ReaderScript is created with a provided callback object. When we are able
-to move to a more competent scripting engine (supposedly in Java 8), the code below will remain the same, but the 'instance' variable
-will actually be a sub-class of ReaderScript.
-<pre>
- with (Scripting) {
- var instance = new ReaderScript({
- route : function(input) {
- var str = IOUtils.toString(input);
- var expr = instance.getProperty("expr");
- filename = instance.attributes.get("filename");
- instance.setAttribute("filename", filename + ".modified");
- if (str.match(expr)) {
- return Script.FAIL_RELATIONSHIP;
- } else {
- return Script.SUCCESS_RELATIONSHIP;
- }
- }
- });
- }
-</pre>
-Ruby example - the 'OutputStreamHandler' is an interface which is called when creating flow files.
-<pre>
- java_import 'org.apache.nifi.scripting.OutputStreamHandler'
- class SimpleConverter < ConverterScript
- field_reader :FAIL_RELATIONSHIP, :SUCCESS_RELATIONSHIP, :logger, :attributes
-
- def convert(input)
- in_io = input.to_io
- createFlowFile("firstLine", FAIL_RELATIONSHIP, OutputStreamHandler.impl do |method, out|
- out_io = out.to_io
- out_io << in_io.readline.to_java_bytes
- out_io.close
- logger.debug("Wrote data to failure...this message logged with logger from super class")
- end)
-
- createFlowFile("otherLines", SUCCESS_RELATIONSHIP, OutputStreamHandler.impl do |method, out|
- out_io = out.to_io
- in_io.each_line { |line|
- out_io << line
- }
- out_io.close
- logger.debug("Wrote data to success...this message logged with logger from super class")
- end)
- in_io.close
- end
-
- end
-
- $logger.debug("Creating SimpleConverter...this message logged with logger from shared variables")
- SimpleConverter.new
-</pre>
-Python example - The difficulty with Python is that it does not return objects upon script evaluation, so the instance of the Script
-class must be fetched by name. Thus, you must define a variable called 'instance'.
-<pre>
- import re
-
- class RoutingReader(ReaderScript):
- A = Relationship.Builder().name("a").description("some good stuff").build()
- B = Relationship.Builder().name("b").description("some other stuff").build()
- C = Relationship.Builder().name("c").description("some bad stuff").build()
-
- def getRelationships(self):
- return [self.A,self.B,self.C]
-
- def getExceptionRoute(self):
- return self.C
-
- def route( self, input ):
- logger.info("Executing route")
- for line in FileUtil.wrap(input):
- if re.match("^bad", line, re.IGNORECASE):
- return self.B
- if re.match("^sed", line):
- raise RuntimeError("That's no good!")
-
- return self.A
- logger.debug("Constructing instance")
- instance = RoutingReader()
-
-</pre>
-</ul>
-<p>
- <strong>Script API:</strong>
-</p>
-<ul>
- <li>getAttribute(String) : String</li>
- <li>getAttributes() : Map(String,String)</li>
- <li>getExceptionRoute() : Relationship</li>
- <li>getFileName() : String</li>
- <li>getFlowFileEntryDate() : Calendar</li>
- <li>getFlowFileSize() : long</li>
- <li>getProperties() : Map(String, String)</li>
- <li>getProperty(String) : String</li>
- <li>getPropertyDescriptors() : List(PropertyDescriptor)</li>
- <li>getRelationships() : Collection(Relationship)</li>
- <li>getRoute() : Relationship</li>
- <li>setRoute(Relationship)</li>
- <li>setAttribute(String, String)</li>
- <li>validate() : Collection(String)</li>
-</ul>
-<p>
- <strong>ReaderScript API:</strong>
-</p>
-<ul>
- <li>route(InputStream) : Relationship</li>
-</ul>
-<p>
- <strong>WriterScript API:</strong>
-</p>
-<ul>
- <li>process(InputStream, OutputStream)</li>
-</ul>
-<p>
- <strong>ConverterScript API:</strong>
-</p>
-<ul>
- <li>convert(InputStream)</li>
- <li>createFlowFile(String, Relationship, OutputStreamHandler)</li>
-</ul>
-<p>
- <strong>OutputStreamHandler API:</strong>
-</p>
-<ul>
- <li>write(OutputStream)</li>
-</ul>
-</body>
-</html>