You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/26 15:16:44 UTC

[28/47] incubator-nifi git commit: NIFI-6: Rebase from develop to include renaming of directory structure

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java
index 0000000,a1fc86d..4827ee3
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java
@@@ -1,0 -1,429 +1,429 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import static javax.xml.xpath.XPathConstants.NODESET;
+ import static javax.xml.xpath.XPathConstants.STRING;
+ 
+ import java.io.ByteArrayOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.io.UnsupportedEncodingException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import javax.xml.namespace.QName;
+ import javax.xml.transform.ErrorListener;
+ import javax.xml.transform.OutputKeys;
+ import javax.xml.transform.Source;
+ import javax.xml.transform.Transformer;
+ import javax.xml.transform.TransformerException;
+ import javax.xml.transform.TransformerFactory;
+ import javax.xml.transform.TransformerFactoryConfigurationError;
+ import javax.xml.transform.stream.StreamResult;
+ import javax.xml.xpath.XPathExpression;
+ import javax.xml.xpath.XPathExpressionException;
+ import javax.xml.xpath.XPathFactory;
+ import javax.xml.xpath.XPathFactoryConfigurationException;
+ 
+ import net.sf.saxon.lib.NamespaceConstant;
+ import net.sf.saxon.xpath.XPathEvaluator;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.util.ObjectHolder;
+ import org.xml.sax.InputSource;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"XML", "evaluate", "XPath"})
+ @CapabilityDescription("Evaluates one or more XPaths against the content of a FlowFile. The results of those XPaths are assigned to "
+         + "FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of the "
+         + "Processor. XPaths are entered by adding user-defined properties; the name of the property maps to the Attribute "
+         + "Name into which the result will be placed (if the Destination is flowfile-content; otherwise, the property name is ignored). "
+         + "The value of the property must be a valid XPath expression. If the XPath evaluates to more than one node and the Return Type is "
+         + "set to 'nodeset' (either directly, or via 'auto-detect' with a Destination of "
+         + "'flowfile-content', the FlowFile will be unmodified and will be routed to failure. If the XPath does not "
+         + "evaluate to a Node, the FlowFile will be routed to 'unmatched' without having its contents modified. If Destination is "
+         + "flowfile-attribute and the expression matches nothing, attributes will be created with empty strings as the value, and the "
+         + "FlowFile will always be routed to 'matched'")
+ public class EvaluateXPath extends AbstractProcessor {
+ 
+     public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
+     public static final String DESTINATION_CONTENT = "flowfile-content";
+     public static final String RETURN_TYPE_AUTO = "auto-detect";
+     public static final String RETURN_TYPE_NODESET = "nodeset";
+     public static final String RETURN_TYPE_STRING = "string";
+ 
+     public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+             .name("Destination")
+             .description("Indicates whether the results of the XPath evaluation are written to the FlowFile content or a FlowFile attribute; if using attribute, must specify the Attribute Name property. If set to flowfile-content, only one XPath may be specified, and the property name is ignored.")
+             .required(true)
+             .allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTE)
+             .defaultValue(DESTINATION_CONTENT)
+             .build();
+ 
+     public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor.Builder()
+             .name("Return Type")
+             .description("Indicates the desired return type of the Xpath expressions.  Selecting 'auto-detect' will set the return type to 'nodeset' for a Destination of 'flowfile-content', and 'string' for a Destination of 'flowfile-attribute'.")
+             .required(true)
+             .allowableValues(RETURN_TYPE_AUTO, RETURN_TYPE_NODESET, RETURN_TYPE_STRING)
+             .defaultValue(RETURN_TYPE_AUTO)
+             .build();
+ 
+     public static final Relationship REL_MATCH = new Relationship.Builder().name("matched").description("FlowFiles are routed to this relationship when the XPath is successfully evaluated and the FlowFile is modified as a result").build();
+     public static final Relationship REL_NO_MATCH = new Relationship.Builder().name("unmatched").description("FlowFiles are routed to this relationship when the XPath does not match the content of the FlowFile and the Destination is set to flowfile-content").build();
+     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles are routed to this relationship when the XPath cannot be evaluated against the content of the FlowFile; for instance, if the FlowFile is not valid XML, or if the Return Type is 'nodeset' and the XPath evaluates to multiple nodes").build();
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+ 
+     private final AtomicReference<XPathFactory> factoryRef = new AtomicReference<>();
+ 
+     static {
+         System.setProperty("javax.xml.xpath.XPathFactory:" + NamespaceConstant.OBJECT_MODEL_SAXON,
+                 "net.sf.saxon.xpath.XPathFactoryImpl");
+     }
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_MATCH);
+         relationships.add(REL_NO_MATCH);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(DESTINATION);
+         properties.add(RETURN_TYPE);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+         final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
+ 
+         final String destination = context.getProperty(DESTINATION).getValue();
+         if (DESTINATION_CONTENT.equals(destination)) {
+             int xpathCount = 0;
+ 
+             for (final PropertyDescriptor desc : context.getProperties().keySet()) {
+                 if (desc.isDynamic()) {
+                     xpathCount++;
+                 }
+             }
+ 
+             if (xpathCount != 1) {
+                 results.add(new ValidationResult.Builder().subject("XPaths").valid(false).explanation("Exactly one XPath must be set if using destination of " + DESTINATION_CONTENT).build());
+             }
+         }
+ 
+         return results;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @OnScheduled
+     public void initializeXPathFactory() throws XPathFactoryConfigurationException {
+         factoryRef.set(XPathFactory.newInstance(NamespaceConstant.OBJECT_MODEL_SAXON));
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+         return new PropertyDescriptor.Builder()
+                 .name(propertyDescriptorName)
+                 .expressionLanguageSupported(false)
+                 .addValidator(new XPathValidator())
+                 .required(false)
+                 .dynamic(true)
+                 .build();
+     }
+ 
+     @Override
+     @SuppressWarnings("unchecked")
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         final List<FlowFile> flowFiles = session.get(50);
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final XPathFactory factory = factoryRef.get();
+         final XPathEvaluator xpathEvaluator = (XPathEvaluator) factory.newXPath();
+         final Map<String, XPathExpression> attributeToXPathMap = new HashMap<>();
+ 
+         for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+             if (!entry.getKey().isDynamic()) {
+                 continue;
+             }
+             final XPathExpression xpathExpression;
+             try {
+                 xpathExpression = xpathEvaluator.compile(entry.getValue());
+                 attributeToXPathMap.put(entry.getKey().getName(), xpathExpression);
+             } catch (XPathExpressionException e) {
+                 throw new ProcessException(e);  // should not happen because we've already validated the XPath (in XPathValidator)
+             }
+         }
+ 
+         final XPathExpression slashExpression;
+         try {
+             slashExpression = xpathEvaluator.compile("/");
+         } catch (XPathExpressionException e) {
+             logger.error("unable to compile XPath expression due to {}", new Object[]{e});
+             session.transfer(flowFiles, REL_FAILURE);
+             return;
+         }
+ 
+         final String destination = context.getProperty(DESTINATION).getValue();
+         final QName returnType;
+ 
+         switch (context.getProperty(RETURN_TYPE).getValue()) {
+             case RETURN_TYPE_AUTO:
+                 if (DESTINATION_ATTRIBUTE.equals(destination)) {
+                     returnType = STRING;
+                 } else if (DESTINATION_CONTENT.equals(destination)) {
+                     returnType = NODESET;
+                 } else {
+                     throw new IllegalStateException("The only possible destinations should be CONTENT or ATTRIBUTE...");
+                 }
+                 break;
+             case RETURN_TYPE_NODESET:
+                 returnType = NODESET;
+                 break;
+             case RETURN_TYPE_STRING:
+                 returnType = STRING;
+                 break;
+             default:
+                 throw new IllegalStateException("There are no other return types...");
+         }
+ 
+         flowFileLoop:
+         for (FlowFile flowFile : flowFiles) {
+             final ObjectHolder<Throwable> error = new ObjectHolder<>(null);
+             final ObjectHolder<Source> sourceRef = new ObjectHolder<>(null);
+ 
+             session.read(flowFile, new InputStreamCallback() {
+                 @Override
+                 public void process(final InputStream rawIn) throws IOException {
+                     try (final InputStream in = new BufferedInputStream(rawIn)) {
+                         final List<Source> rootList = (List<Source>) slashExpression.evaluate(new InputSource(in), NODESET);
+                         sourceRef.set(rootList.get(0));
+                     } catch (final Exception e) {
+                         error.set(e);
+                     }
+                 }
+             });
+ 
+             if (error.get() != null) {
+                 logger.error("unable to evaluate XPath against {} due to {}; routing to 'failure'", new Object[]{flowFile, error.get()});
+                 session.transfer(flowFile, REL_FAILURE);
+                 continue;
+             }
+ 
+             final Map<String, String> xpathResults = new HashMap<>();
+ 
+             for (final Map.Entry<String, XPathExpression> entry : attributeToXPathMap.entrySet()) {
+                 Object result = null;
+                 try {
+                     result = entry.getValue().evaluate(sourceRef.get(), returnType);
+                     if (result == null) {
+                         continue;
+                     }
+                 } catch (final XPathExpressionException e) {
+                     logger.error("failed to evaluate XPath for {} for Property {} due to {}; routing to failure", new Object[]{flowFile, entry.getKey(), e});
+                     session.transfer(flowFile, REL_FAILURE);
+                     continue flowFileLoop;
+                 }
+ 
+                 if (returnType == NODESET) {
+                     List<Source> nodeList = (List<Source>) result;
+                     if (nodeList.isEmpty()) {
+                         logger.info("Routing {} to 'unmatched'", new Object[]{flowFile});
+                         session.transfer(flowFile, REL_NO_MATCH);
+                         continue flowFileLoop;
+                     } else if (nodeList.size() > 1) {
+                         logger.error("Routing {} to 'failure' because the XPath evaluated to {} XML nodes", new Object[]{
+                             flowFile, nodeList.size()});
+                         session.transfer(flowFile, REL_FAILURE);
+                         continue flowFileLoop;
+                     }
+                     final Source sourceNode = nodeList.get(0);
+ 
+                     if (DESTINATION_ATTRIBUTE.equals(destination)) {
+                         try {
+                             ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                             doTransform(sourceNode, baos);
+                             xpathResults.put(entry.getKey(), baos.toString("UTF-8"));
+                         } catch (UnsupportedEncodingException e) {
+                             throw new ProcessException(e); // this REALLY shouldn't happen						
+                         } catch (TransformerException e) {
+                             error.set(e);
+                         }
+ 
+                     } else if (DESTINATION_CONTENT.equals(destination)) {
+                         flowFile = session.write(flowFile, new OutputStreamCallback() {
+                             @Override
+                             public void process(final OutputStream rawOut) throws IOException {
+                                 try (final OutputStream out = new BufferedOutputStream(rawOut)) {
+                                     doTransform(sourceNode, out);
+                                 } catch (TransformerException e) {
+                                     error.set(e);
+                                 }
+                             }
+                         });
+                     }
+ 
+                 } else if (returnType == STRING) {
+                     final String resultString = (String) result;
+ 
+                     if (DESTINATION_ATTRIBUTE.equals(destination)) {
+                         xpathResults.put(entry.getKey(), resultString);
+                     } else if (DESTINATION_CONTENT.equals(destination)) {
+                         flowFile = session.write(flowFile, new OutputStreamCallback() {
+                             @Override
+                             public void process(final OutputStream rawOut) throws IOException {
+                                 try (final OutputStream out = new BufferedOutputStream(rawOut)) {
+                                     out.write(resultString.getBytes("UTF-8"));
+                                 }
+                             }
+                         });
+                     }
+                 }
+             }
+ 
+             if (error.get() == null) {
+                 if (DESTINATION_ATTRIBUTE.equals(destination)) {
+                     flowFile = session.putAllAttributes(flowFile, xpathResults);
+                     final Relationship destRel = xpathResults.isEmpty() ? REL_NO_MATCH : REL_MATCH;
+                     logger.info("Successfully evaluated XPaths against {} and found {} matches; routing to {}", new Object[]{flowFile,
+                         xpathResults.size(), destRel.getName()});
+                     session.transfer(flowFile, destRel);
+                     session.getProvenanceReporter().modifyAttributes(flowFile);
+                 } else if (DESTINATION_CONTENT.equals(destination)) {
+                     logger.info("Successfully updated content for {}; routing to 'matched'", new Object[]{flowFile});
+                     session.transfer(flowFile, REL_MATCH);
+                     session.getProvenanceReporter().modifyContent(flowFile);
+                 }
+             } else {
+                 logger.error("Failed to write XPath result for {} due to {}; routing original to 'failure'", new Object[]{flowFile, error.get()});
+                 session.transfer(flowFile, REL_FAILURE);
+             }
+         }
+     }
+ 
+     private void doTransform(final Source sourceNode, OutputStream out) throws TransformerFactoryConfigurationError, TransformerException {
+         final Transformer transformer;
+         try {
+             transformer = TransformerFactory.newInstance().newTransformer();
+         } catch (final Exception e) {
+             throw new ProcessException(e);
+         }
+ 
+         final Properties props = new Properties();
+         props.setProperty(OutputKeys.METHOD, "xml");
+         props.setProperty(OutputKeys.INDENT, "no");
+         props.setProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+         transformer.setOutputProperties(props);
+ 
+         final ProcessorLog logger = getLogger();
+         
+         final ObjectHolder<TransformerException> error = new ObjectHolder<>(null);
+         transformer.setErrorListener(new ErrorListener() {
+             @Override
+             public void warning(final TransformerException exception) throws TransformerException {
+                 logger.warn("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception});
+             }
+ 
+             @Override
+             public void error(final TransformerException exception) throws TransformerException {
+                 logger.error("Encountered error from XPath Engine: ", new Object[] {exception.toString(), exception});
+                 error.set(exception);
+             }
+ 
+             @Override
+             public void fatalError(final TransformerException exception) throws TransformerException {
+                 logger.error("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception});
+                 error.set(exception);
+             }
+         });
+         
+         transformer.transform(sourceNode, new StreamResult(out));
+         if ( error.get() != null ) {
+             throw error.get();
+         }
+     }
+ 
+     private static class XPathValidator implements Validator {
+ 
+         @Override
+         public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) {
+             try {
+                 XPathFactory factory = XPathFactory.newInstance(NamespaceConstant.OBJECT_MODEL_SAXON);
+                 final XPathEvaluator evaluator = (XPathEvaluator) factory.newXPath();
+ 
+                 String error = null;
+                 try {
+                     evaluator.compile(input);
+                 } catch (final Exception e) {
+                     error = e.toString();
+                 }
+ 
+                 return new ValidationResult.Builder().input(input).subject(subject).valid(error == null).explanation(error).build();
+             } catch (final Exception e) {
+                 return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Unable to initialize XPath engine due to " + e.toString()).build();
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java
index 0000000,8b4ce09..3ddee83
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java
@@@ -1,0 -1,463 +1,463 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.ByteArrayOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ import java.util.Set;
+ 
+ import javax.xml.parsers.DocumentBuilderFactory;
+ import javax.xml.transform.OutputKeys;
+ import javax.xml.transform.Transformer;
+ import javax.xml.transform.TransformerConfigurationException;
+ import javax.xml.transform.TransformerException;
+ import javax.xml.transform.TransformerFactory;
+ import javax.xml.transform.TransformerFactoryConfigurationError;
+ import javax.xml.transform.sax.SAXSource;
+ import javax.xml.transform.stream.StreamResult;
+ 
+ import net.sf.saxon.s9api.DOMDestination;
+ import net.sf.saxon.s9api.Processor;
+ import net.sf.saxon.s9api.SaxonApiException;
+ import net.sf.saxon.s9api.XQueryCompiler;
+ import net.sf.saxon.s9api.XQueryEvaluator;
+ import net.sf.saxon.s9api.XQueryExecutable;
+ import net.sf.saxon.s9api.XdmItem;
+ import net.sf.saxon.s9api.XdmNode;
+ import net.sf.saxon.s9api.XdmValue;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.ObjectHolder;
+ 
+ import org.w3c.dom.Document;
+ import org.xml.sax.InputSource;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"XML", "evaluate", "XPath", "XQuery", "experimental"})
+ @CapabilityDescription(
+         "Evaluates one or more XQueries against the content of a FlowFile.  The results of those XQueries are assigned "
+         + "to FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of "
+         + "the Processor.  XQueries are entered by adding user-defined properties; the name of the property maps to the "
+         + "Attribute Name into which the result will be placed (if the Destination is 'flowfile-attribute'; otherwise, "
+         + "the property name is ignored).  The value of the property must be a valid XQuery.  If the XQuery returns more "
+         + "than one result, new attributes or FlowFiles (for Destinations of 'flowfile-attribute' or 'flowfile-content' "
+         + "respectively) will be created for each result (attributes will have a '.n' one-up number appended to the "
+         + "specified attribute name).  If any provided XQuery returns a result, the FlowFile(s) will be routed to "
+         + "'matched'. If no provided XQuery returns a result, the FlowFile will be routed to 'unmatched'.  If the "
+         + "Destination is 'flowfile-attribute' and the XQueries matche nothing, no attributes will be applied to the "
+         + "FlowFile.")
+ 
+ public class EvaluateXQuery extends AbstractProcessor {
+ 
+     public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
+     public static final String DESTINATION_CONTENT = "flowfile-content";
+ 
+     public static final String OUTPUT_METHOD_XML = "xml";
+     public static final String OUTPUT_METHOD_HTML = "html";
+     public static final String OUTPUT_METHOD_TEXT = "text";
+ 
+     public static final String UTF8 = "UTF-8";
+ 
+     public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+             .name("Destination")
+             .description(
+                     "Indicates whether the results of the XQuery evaluation are written to the FlowFile content or a "
+                     + "FlowFile attribute. If set to <flowfile-content>, only one XQuery may be specified and the property "
+                     + "name is ignored.  If set to <flowfile-attribute> and the XQuery returns more than one result, "
+                     + "multiple attributes will be added to theFlowFile, each named with a '.n' one-up number appended to "
+                     + "the specified attribute name")
+             .required(true)
+             .allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTE)
+             .defaultValue(DESTINATION_CONTENT)
+             .build();
+ 
+     public static final PropertyDescriptor XML_OUTPUT_METHOD = new PropertyDescriptor.Builder()
+             .name("Output: Method")
+             .description("Identifies the overall method that should be used for outputting a result tree.")
+             .required(true)
+             .allowableValues(OUTPUT_METHOD_XML, OUTPUT_METHOD_HTML, OUTPUT_METHOD_TEXT)
+             .defaultValue(OUTPUT_METHOD_XML)
+             .build();
+ 
+     public static final PropertyDescriptor XML_OUTPUT_OMIT_XML_DECLARATION = new PropertyDescriptor.Builder()
+             .name("Output: Omit XML Declaration")
+             .description("Specifies whether the processor should output an XML declaration when transforming a result tree.")
+             .required(true)
+             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor XML_OUTPUT_INDENT = new PropertyDescriptor.Builder()
+             .name("Output: Indent")
+             .description("Specifies whether the processor may add additional whitespace when outputting a result tree.")
+             .required(true)
+             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+             .defaultValue("false")
+             .build();
+ 
+     public static final Relationship REL_MATCH = new Relationship.Builder()
+             .name("matched")
+             .description(
+                     "FlowFiles are routed to this relationship when the XQuery is successfully evaluated and the FlowFile "
+                     + "is modified as a result")
+             .build();
+ 
+     public static final Relationship REL_NO_MATCH = new Relationship.Builder()
+             .name("unmatched")
+             .description(
+                     "FlowFiles are routed to this relationship when the XQuery does not match the content of the FlowFile "
+                     + "and the Destination is set to flowfile-content")
+             .build();
+ 
+     public static final Relationship REL_FAILURE = new Relationship.Builder()
+             .name("failure")
+             .description(
+                     "FlowFiles are routed to this relationship when the XQuery cannot be evaluated against the content of "
+                     + "the FlowFile.")
+             .build();
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_MATCH);
+         relationships.add(REL_NO_MATCH);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(DESTINATION);
+         properties.add(XML_OUTPUT_METHOD);
+         properties.add(XML_OUTPUT_OMIT_XML_DECLARATION);
+         properties.add(XML_OUTPUT_INDENT);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+         final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
+ 
+         final String destination = context.getProperty(DESTINATION).getValue();
+         if (DESTINATION_CONTENT.equals(destination)) {
+             int xQueryCount = 0;
+             for (final PropertyDescriptor desc : context.getProperties().keySet()) {
+                 if (desc.isDynamic()) {
+                     xQueryCount++;
+                 }
+             }
+             if (xQueryCount != 1) {
+                 results.add(new ValidationResult.Builder()
+                         .subject("XQueries")
+                         .valid(false)
+                         .explanation("Exactly one XQuery must be set if using destination of " + DESTINATION_CONTENT)
+                         .build());
+             }
+         }
+         return results;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+         return new PropertyDescriptor.Builder()
+                 .name(propertyDescriptorName)
+                 .expressionLanguageSupported(false)
+                 .addValidator(new XQueryValidator())
+                 .required(false)
+                 .dynamic(true)
+                 .build();
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         final List<FlowFile> flowFileBatch = session.get(50);
+         if (flowFileBatch.isEmpty()) {
+             return;
+         }
+         final ProcessorLog logger = getLogger();
+         final Map<String, XQueryExecutable> attributeToXQueryMap = new HashMap<>();
+ 
+         final Processor proc = new Processor(false);
+         final XQueryCompiler comp = proc.newXQueryCompiler();
+ 
+         for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+             if (!entry.getKey().isDynamic()) {
+                 continue;
+             }
+             final XQueryExecutable exp;
+             try {
+                 exp = comp.compile(entry.getValue());
+                 attributeToXQueryMap.put(entry.getKey().getName(), exp);
+             } catch (SaxonApiException e) {
+                 throw new ProcessException(e);  // should not happen because we've already validated the XQuery (in XQueryValidator)
+             }
+         }
+ 
+         final XQueryExecutable slashExpression;
+         try {
+             slashExpression = comp.compile("/");
+         } catch (SaxonApiException e) {
+             logger.error("unable to compile XQuery expression due to {}", new Object[]{e});
+             session.transfer(flowFileBatch, REL_FAILURE);
+             return;
+         }
+ 
+         final String destination = context.getProperty(DESTINATION).getValue();
+ 
+         flowFileLoop:
+         for (FlowFile flowFile : flowFileBatch) {
+             if (!isScheduled()) {
+                 session.rollback();
+                 return;
+             }
+ 
+             final ObjectHolder<Throwable> error = new ObjectHolder<>(null);
+             final ObjectHolder<XdmNode> sourceRef = new ObjectHolder<>(null);
+ 
+             session.read(flowFile, new InputStreamCallback() {
+                 @Override
+                 public void process(final InputStream rawIn) throws IOException {
+                     try (final InputStream in = new BufferedInputStream(rawIn)) {
+                         XQueryEvaluator qe = slashExpression.load();
+                         qe.setSource(new SAXSource(new InputSource(in)));
+                         DocumentBuilderFactory dfactory = DocumentBuilderFactory.newInstance();
+                         dfactory.setNamespaceAware(true);
+                         Document dom = dfactory.newDocumentBuilder().newDocument();
+                         qe.run(new DOMDestination(dom));
+                         XdmNode rootNode = proc.newDocumentBuilder().wrap(dom);
+                         sourceRef.set(rootNode);
+                     } catch (final Exception e) {
+                         error.set(e);
+                     }
+                 }
+             });
+ 
+             if (error.get() != null) {
+                 logger.error("unable to evaluate XQuery against {} due to {}; routing to 'failure'",
+                         new Object[]{flowFile, error.get()});
+                 session.transfer(flowFile, REL_FAILURE);
+                 continue;
+             }
+ 
+             final Map<String, String> xQueryResults = new HashMap<>();
+             List<FlowFile> childrenFlowFiles = new ArrayList<>();
+ 
+             for (final Map.Entry<String, XQueryExecutable> entry : attributeToXQueryMap.entrySet()) {
+                 try {
+                     XQueryEvaluator qe = entry.getValue().load();
+                     qe.setContextItem(sourceRef.get());
+                     XdmValue result = qe.evaluate();
+ 
+                     if (DESTINATION_ATTRIBUTE.equals(destination)) {
+                         int index = 1;
+                         for (XdmItem item : result) {
+                             String value = formatItem(item, context);
+                             String attributeName = entry.getKey();
+                             if (result.size() > 1) {
+                                 attributeName += "." + index++;
+                             }
+                             xQueryResults.put(attributeName, value);
+                         }
+                     } else { // if (DESTINATION_CONTENT.equals(destination)){
+                         if (result.size() == 0) {
+                             logger.info("Routing {} to 'unmatched'", new Object[]{flowFile});
+                             session.transfer(flowFile, REL_NO_MATCH);
+                             continue flowFileLoop;
+                         } else if (result.size() == 1) {
+                             final XdmItem item = result.itemAt(0);
+                             flowFile = session.write(flowFile, new OutputStreamCallback() {
+                                 @Override
+                                 public void process(final OutputStream rawOut) throws IOException {
+                                     try (final OutputStream out = new BufferedOutputStream(rawOut)) {
+                                         writeformattedItem(item, context, out);
+                                     } catch (TransformerFactoryConfigurationError | TransformerException e) {
+                                         throw new IOException(e);
+                                     }
+                                 }
+                             });
+                         } else {
+                             for (final XdmItem item : result) {
+                                 FlowFile ff = session.clone(flowFile);
+                                 ff = session.write(ff, new OutputStreamCallback() {
+                                     @Override
+                                     public void process(final OutputStream rawOut) throws IOException {
+                                         try (final OutputStream out = new BufferedOutputStream(rawOut)) {
+                                             try {
+                                                 writeformattedItem(item, context, out);
+                                             } catch (TransformerFactoryConfigurationError | TransformerException e) {
+                                                 throw new IOException(e);
+                                             }
+                                         }
+                                     }
+                                 });
+                                 childrenFlowFiles.add(ff);
+                             }
+                         }
+                     }
+                 } catch (final SaxonApiException e) {
+                     logger.error("failed to evaluate XQuery for {} for Property {} due to {}; routing to failure",
+                             new Object[]{flowFile, entry.getKey(), e});
+                     session.transfer(flowFile, REL_FAILURE);
+                     session.remove(childrenFlowFiles);
+                     continue flowFileLoop;
+                 } catch (TransformerFactoryConfigurationError | TransformerException | IOException e) {
+                     logger.error("Failed to write XQuery result for {} due to {}; routing original to 'failure'", new Object[]{
+                         flowFile, error.get()});
+                     session.transfer(flowFile, REL_FAILURE);
+                     session.remove(childrenFlowFiles);
+                     continue flowFileLoop;
+                 }
+             }
+ 
+             if (DESTINATION_ATTRIBUTE.equals(destination)) {
+                 flowFile = session.putAllAttributes(flowFile, xQueryResults);
+                 final Relationship destRel = xQueryResults.isEmpty() ? REL_NO_MATCH : REL_MATCH;
+                 logger.info("Successfully evaluated XQueries against {} and found {} matches; routing to {}",
+                         new Object[]{flowFile, xQueryResults.size(), destRel.getName()});
+                 session.transfer(flowFile, destRel);
+                 session.getProvenanceReporter().modifyAttributes(flowFile);
+             } else { // if (DESTINATION_CONTENT.equals(destination)) {
+                 if (!childrenFlowFiles.isEmpty()) {
+                     logger.info("Successfully created {} new FlowFiles from {}; routing all to 'matched'",
+                             new Object[]{childrenFlowFiles.size(), flowFile});
+                     session.transfer(childrenFlowFiles, REL_MATCH);
+                     session.remove(flowFile);
+                 } else {
+                     logger.info("Successfully updated content for {}; routing to 'matched'", new Object[]{flowFile});
+                     session.transfer(flowFile, REL_MATCH);
+                     session.getProvenanceReporter().modifyContent(flowFile);
+                 }
+             }
+         } // end flowFileLoop
+     }
+ 
+     private String formatItem(XdmItem item, ProcessContext context) throws TransformerConfigurationException,
+             TransformerFactoryConfigurationError, TransformerException, IOException {
+         ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         writeformattedItem(item, context, baos);
+         return baos.toString();
+     }
+ 
+     void writeformattedItem(XdmItem item, ProcessContext context, OutputStream out) throws TransformerConfigurationException,
+             TransformerFactoryConfigurationError, TransformerException, IOException {
+ 
+         if (item.isAtomicValue()) {
+             out.write(item.getStringValue().getBytes(UTF8));
+         } else { // item is an XdmNode
+             XdmNode node = (XdmNode) item;
+             switch (node.getNodeKind()) {
+                 case DOCUMENT:
+                 case ELEMENT:
+                     Transformer transformer = TransformerFactory.newInstance().newTransformer();
+                     final Properties props = getTransformerProperties(context);
+                     transformer.setOutputProperties(props);
+                     transformer.transform(node.asSource(), new StreamResult(out));
+                     break;
+                 default:
+                     out.write(node.getStringValue().getBytes(UTF8));
+             }
+         }
+     }
+ 
+     private Properties getTransformerProperties(ProcessContext context) {
+         final String method = context.getProperty(XML_OUTPUT_METHOD).getValue();
+         boolean indent = context.getProperty(XML_OUTPUT_INDENT).asBoolean();
+         boolean omitDeclaration = context.getProperty(XML_OUTPUT_OMIT_XML_DECLARATION).asBoolean();
+         return getTransformerProperties(method, indent, omitDeclaration);
+     }
+ 
+     static Properties getTransformerProperties(final String method, final boolean indent, final boolean omitDeclaration) {
+         final Properties props = new Properties();
+         props.setProperty(OutputKeys.METHOD, method);
+         props.setProperty(OutputKeys.INDENT, indent ? "yes" : "no");
+         props.setProperty(OutputKeys.OMIT_XML_DECLARATION, omitDeclaration ? "yes" : "no");
+         return props;
+     }
+ 
+     private static class XQueryValidator implements Validator {
+ 
+         @Override
+         public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) {
+             try {
+                 final Processor proc = new Processor(false);
+                 final XQueryCompiler comp = proc.newXQueryCompiler();
+                 String error = null;
+                 try {
+                     comp.compile(input);
+                 } catch (final Exception e) {
+                     error = e.toString();
+                 }
+                 return new ValidationResult.Builder()
+                         .input(input)
+                         .subject(subject)
+                         .valid(error == null)
+                         .explanation(error)
+                         .build();
+             } catch (final Exception e) {
+                 return new ValidationResult.Builder()
+                         .input(input)
+                         .subject(subject)
+                         .valid(false)
+                         .explanation("Unable to initialize XQuery engine due to " + e.toString())
+                         .build();
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index 0000000,ab0b2aa..dda3647
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@@ -1,0 -1,358 +1,358 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.BufferedReader;
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.InputStreamReader;
+ import java.io.OutputStream;
+ import java.lang.ProcessBuilder.Redirect;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.expression.AttributeExpression.ResultType;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ import org.apache.commons.io.IOUtils;
+ import org.apache.commons.lang3.StringUtils;
+ 
+ /**
+  * <p>
+  * This processor executes an external command on the contents of a flow file,
+  * and creates a new flow file with the results of the command.
+  * </p>
+  * <p>
+  * <strong>Properties:</strong>
+  * </p>
+  * <ul>
+  * <li><strong>Command Path</strong>
+  * <ul>
+  * <li>Specifies the command to be executed; if just the name of an executable
+  * is provided, it must be in the user's environment PATH.</li>
+  * <li>Default value: none</li>
+  * <li>Supports expression language: true</li>
+  * </ul>
+  * </li>
+  * <li>Command Arguments
+  * <ul>
+  * <li>The arguments to supply to the executable delimited by the ';' character.
+  * Each argument may be an Expression Language statement.</li>
+  * <li>Default value: none</li>
+  * <li>Supports expression language: true</li>
+  * </ul>
+  * </li>
+  * <li>Working Directory
+  * <ul>
+  * <li>The directory to use as the current working directory when executing the
+  * command</li>
+  * <li>Default value: none (which means whatever NiFi's current working
+  * directory is...probably the root of the NiFi installation directory.)</li>
+  * <li>Supports expression language: true</li>
+  * </ul>
+  * </li>
+  *
+  * </ul>
+  *
+  * <p>
+  * <strong>Relationships:</strong>
+  * </p>
+  * <ul>
+  * <li>original
+  * <ul>
+  * <li>The destination path for the original incoming flow file</li>
+  * </ul>
+  * </li>
+  * <li>output-stream
+  * <ul>
+  * <li>The destination path for the flow file created from the command's
+  * output</li>
+  * </ul>
+  * </li>
+  * </ul>
+  * <p>
+  *
+  * @author unattributed
+  */
+ @EventDriven
+ @SupportsBatching
+ @Tags({"command execution", "command", "stream", "execute"})
+ @CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
+ public class ExecuteStreamCommand extends AbstractProcessor {
+ 
+     public static final Relationship ORIGINAL_RELATIONSHIP = new Relationship.Builder()
+             .name("original")
+             .description("FlowFiles that were successfully processed")
+             .build();
+     public static final Relationship OUTPUT_STREAM_RELATIONSHIP = new Relationship.Builder()
+             .name("output stream")
+             .description("The destination path for the flow file created from the command's output")
+             .build();
+     private static final Set<Relationship> RELATIONSHIPS;
+ 
+     static {
+         Set<Relationship> rels = new HashSet<>();
+         rels.add(OUTPUT_STREAM_RELATIONSHIP);
+         rels.add(ORIGINAL_RELATIONSHIP);
+         RELATIONSHIPS = Collections.unmodifiableSet(rels);
+     }
+ 
+     private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator(
+             ResultType.STRING, true);
+     static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder()
+             .name("Command Path")
+             .description(
+                     "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
+             .expressionLanguageSupported(true)
+             .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+             .required(true)
+             .build();
+ 
+     static final PropertyDescriptor EXECUTION_ARGUMENTS = new PropertyDescriptor.Builder()
+             .name("Command Arguments")
+             .description("The arguments to supply to the executable delimited by the ';' character.")
+             .expressionLanguageSupported(true)
+             .addValidator(new Validator() {
+ 
+                 @Override
+                 public ValidationResult validate(String subject, String input, ValidationContext context) {
+                     ValidationResult result = new ValidationResult.Builder()
+                     .subject(subject)
+                     .valid(true)
+                     .input(input)
+                     .build();
+                     String[] args = input.split(";");
+                     for (String arg : args) {
+                         ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context);
+                         if (!valResult.isValid()) {
+                             result = valResult;
+                             break;
+                         }
+                     }
+                     return result;
+                 }
+             })
+             .build();
+ 
+     static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
+             .name("Working Directory")
+             .description("The directory to use as the current working directory when executing the command")
+             .expressionLanguageSupported(true)
+             .addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
+             .required(false)
+             .build();
+ 
+     private static final List<PropertyDescriptor> PROPERTIES;
+ 
+     static {
+         List<PropertyDescriptor> props = new ArrayList<>();
+         props.add(EXECUTION_ARGUMENTS);
+         props.add(EXECUTION_COMMAND);
+         props.add(WORKING_DIR);
+         PROPERTIES = Collections.unmodifiableList(props);
+     }
+ 
+     private ProcessorLog logger;
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return RELATIONSHIPS;
+     }
+ 
+     @Override
+     protected void init(ProcessorInitializationContext context) {
+         logger = getLogger();
+     }
+ 
+     @Override
+     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return PROPERTIES;
+     }
+ 
+     @Override
+     public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException {
+         FlowFile flowFile = session.get();
+         if (null == flowFile) {
+             return;
+         }
+ 
+         final ArrayList<String> args = new ArrayList<>();
+         final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue();
+         args.add(executeCommand);
+         final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue();
+         if (!StringUtils.isBlank(commandArguments)) {
+             for (String arg : commandArguments.split(";")) {
+                 args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue());
+             }
+         }
+         final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(flowFile).getValue();
+ 
+         final ProcessBuilder builder = new ProcessBuilder();
+ 
+         logger.debug("Executing and waiting for command {} with arguments {}", new Object[]{executeCommand, commandArguments});
+         File dir = null;
+         if (!StringUtils.isBlank(workingDir)) {
+             dir = new File(workingDir);
+             if (!dir.exists() && !dir.mkdirs()) {
+                 logger.warn("Failed to create working directory {}, using current working directory {}",
+                         new Object[]{workingDir, System.getProperty("user.dir")});
+             }
+         }
+         builder.command(args);
+         builder.directory(dir);
+         builder.redirectInput(Redirect.PIPE);
+         builder.redirectOutput(Redirect.PIPE);
+         final Process process;
+         try {
+             process = builder.start();
+         } catch (IOException e) {
+             logger.error("Could not create external process to run command", e);
+             throw new ProcessException(e);
+         }
+         try (final OutputStream pos = process.getOutputStream();
+                 final InputStream pis = process.getInputStream();
+                 final InputStream pes = process.getErrorStream();
+                 final BufferedInputStream bis = new BufferedInputStream(pis);
+                 final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) {
+             int exitCode = -1;
+             final BufferedOutputStream bos = new BufferedOutputStream(pos);
+             FlowFile outputStreamFlowFile = session.create(flowFile);
+             StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process);
+             session.read(flowFile, callback);
+             outputStreamFlowFile = callback.outputStreamFlowFile;
+             exitCode = callback.exitCode;
+             logger.debug("Execution complete for command: {}.  Exited with code: {}", new Object[]{executeCommand, exitCode});
+ 
+             Map<String, String> attributes = new HashMap<>();
+ 
+             final StringBuilder strBldr = new StringBuilder();
+             try {
+                 String line;
+                 while ((line = bufferedReader.readLine()) != null) {
+                     strBldr.append(line).append("\n");
+                 }
+             } catch (IOException e) {
+                 strBldr.append("Unknown...could not read Process's Std Error");
+             }
+             int length = strBldr.length() > 4000 ? 4000 : strBldr.length();
+             attributes.put("execution.error", strBldr.substring(0, length));
+ 
+             if (exitCode == 0) {
+                 logger.info("Transferring flow file {} to 'output stream'", new Object[]{outputStreamFlowFile});
+             } else {
+                 logger.error("Transferring flow file {} to 'output stream'. Executable command {} ended in an error: {}",
+                         new Object[]{outputStreamFlowFile, executeCommand, strBldr.toString()});
+             }
+ 
+             attributes.put("execution.status", Integer.toString(exitCode));
+             attributes.put("execution.command", executeCommand);
+             attributes.put("execution.command.args", commandArguments);
+             outputStreamFlowFile = session.putAllAttributes(outputStreamFlowFile, attributes);
+             session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP);
+             logger.info("Transferring flow file {} to original", new Object[]{flowFile});
+             flowFile = session.putAllAttributes(flowFile, attributes);
+             session.transfer(flowFile, ORIGINAL_RELATIONSHIP);
+ 
+         } catch (final IOException ex) {
+             // could not close Process related streams
+             logger.warn("Problem terminating Process {}", new Object[]{process}, ex);
+         } finally {
+             process.destroy(); // last ditch effort to clean up that process.
+         }
+     }
+ 
+     static class StdInWriterCallback implements InputStreamCallback {
+ 
+         final OutputStream stdInWritable;
+         final InputStream stdOutReadable;
+         final ProcessorLog logger;
+         final ProcessSession session;
+         final Process process;
+         FlowFile outputStreamFlowFile;
+         int exitCode;
+ 
+         public StdInWriterCallback(OutputStream stdInWritable, InputStream stdOutReadable, ProcessorLog logger, ProcessSession session,
+                 FlowFile outputStreamFlowFile, Process process) {
+             this.stdInWritable = stdInWritable;
+             this.stdOutReadable = stdOutReadable;
+             this.logger = logger;
+             this.session = session;
+             this.outputStreamFlowFile = outputStreamFlowFile;
+             this.process = process;
+         }
+ 
+         @Override
+         public void process(final InputStream incomingFlowFileIS) throws IOException {
+             outputStreamFlowFile = session.write(outputStreamFlowFile, new OutputStreamCallback() {
+ 
+                 @Override
+                 public void process(OutputStream out) throws IOException {
+                     Thread writerThread = new Thread(new Runnable() {
+ 
+                         @Override
+                         public void run() {
+                             try {
+                                 StreamUtils.copy(incomingFlowFileIS, stdInWritable);
+                             } catch (IOException e) {
+                                 logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e);
+                             }
+                             // MUST close the output stream to the stdIn so that whatever is reading knows
+                             // there is no more data
+                             IOUtils.closeQuietly(stdInWritable);
+                         }
+                     });
+                     writerThread.setDaemon(true);
+                     writerThread.start();
+                     StreamUtils.copy(stdOutReadable, out);
+                     try {
+                         exitCode = process.waitFor();
+                     } catch (InterruptedException e) {
+                         logger.warn("Command Execution Process was interrupted", e);
+                     }
+                 }
+             });
+         }
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
index 0000000,6f18a01..0ae4747
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
@@@ -1,0 -1,164 +1,164 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Random;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ @SupportsBatching
+ @Tags({"test", "random", "generate"})
+ @CapabilityDescription("This processor creates FlowFiles of random data and is used for load testing")
+ public class GenerateFlowFile extends AbstractProcessor {
+ 
+     private final AtomicReference<byte[]> data = new AtomicReference<>();
+ 
+     public static final String DATA_FORMAT_BINARY = "Binary";
+     public static final String DATA_FORMAT_TEXT = "Text";
+ 
+     public static final PropertyDescriptor FILE_SIZE = new PropertyDescriptor.Builder()
+             .name("File Size")
+             .description("The size of the file that will be used")
+             .required(true)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+             .name("Batch Size")
+             .description("The number of FlowFiles to be transferred in each invocation")
+             .required(true)
+             .defaultValue("1")
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor DATA_FORMAT = new PropertyDescriptor.Builder()
+             .name("Data Format")
+             .description("Specifies whether the data should be Text or Binary")
+             .required(true)
+             .defaultValue(DATA_FORMAT_BINARY)
+             .allowableValues(DATA_FORMAT_BINARY, DATA_FORMAT_TEXT)
+             .build();
+     public static final PropertyDescriptor UNIQUE_FLOWFILES = new PropertyDescriptor.Builder()
+             .name("Unique FlowFiles")
+             .description("If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles will get the same content but this offers much higher throughput")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
+ 
+     private List<PropertyDescriptor> descriptors;
+     private Set<Relationship> relationships;
+ 
+     private static final char[] TEXT_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t ".toCharArray();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+         descriptors.add(FILE_SIZE);
+         descriptors.add(BATCH_SIZE);
+         descriptors.add(DATA_FORMAT);
+         descriptors.add(UNIQUE_FLOWFILES);
+         this.descriptors = Collections.unmodifiableList(descriptors);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(SUCCESS);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return descriptors;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @OnScheduled
+     public void onScheduled(final ProcessContext context) {
+         if (context.getProperty(UNIQUE_FLOWFILES).asBoolean()) {
+             this.data.set(null);
+         } else {
+             this.data.set(generateData(context));
+         }
+ 
+     }
+ 
+     private byte[] generateData(final ProcessContext context) {
+         final int byteCount = context.getProperty(FILE_SIZE).asDataSize(DataUnit.B).intValue();
+ 
+         final Random random = new Random();
+         final byte[] array = new byte[byteCount];
+         if (context.getProperty(DATA_FORMAT).getValue().equals(DATA_FORMAT_BINARY)) {
+             random.nextBytes(array);
+         } else {
+             for (int i = 0; i < array.length; i++) {
+                 final int index = random.nextInt(TEXT_CHARS.length);
+                 array[i] = (byte) TEXT_CHARS[index];
+             }
+         }
+ 
+         return array;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         final byte[] data;
+         if (context.getProperty(UNIQUE_FLOWFILES).asBoolean()) {
+             data = generateData(context);
+         } else {
+             data = this.data.get();
+         }
+ 
+         for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) {
+             FlowFile flowFile = session.create();
+             if (data.length > 0) {
+                 flowFile = session.write(flowFile, new OutputStreamCallback() {
+                     @Override
+                     public void process(final OutputStream out) throws IOException {
+                         out.write(data);
+                     }
+                 });
+             }
+ 
+             session.getProvenanceReporter().create(flowFile);
+             session.transfer(flowFile, SUCCESS);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
index 0000000,854ee37..2dabbc6
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
@@@ -1,0 -1,72 +1,72 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processors.standard.util.FTPTransfer;
+ import org.apache.nifi.processors.standard.util.FileTransfer;
+ 
+ @SideEffectFree
+ @Tags({"FTP", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
+ @CapabilityDescription("Fetches files from an FTP Server and creates FlowFiles from them")
+ public class GetFTP extends GetFileTransfer {
+ 
+     private List<PropertyDescriptor> properties;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(FTPTransfer.HOSTNAME);
+         properties.add(FTPTransfer.PORT);
+         properties.add(FTPTransfer.USERNAME);
+         properties.add(FTPTransfer.PASSWORD);
+         properties.add(FTPTransfer.CONNECTION_MODE);
+         properties.add(FTPTransfer.TRANSFER_MODE);
+         properties.add(FTPTransfer.REMOTE_PATH);
+         properties.add(FTPTransfer.FILE_FILTER_REGEX);
+         properties.add(FTPTransfer.PATH_FILTER_REGEX);
+         properties.add(FTPTransfer.POLLING_INTERVAL);
+         properties.add(FTPTransfer.RECURSIVE_SEARCH);
+         properties.add(FTPTransfer.IGNORE_DOTTED_FILES);
+         properties.add(FTPTransfer.DELETE_ORIGINAL);
+         properties.add(FTPTransfer.CONNECTION_TIMEOUT);
+         properties.add(FTPTransfer.DATA_TIMEOUT);
+         properties.add(FTPTransfer.MAX_SELECTS);
+         properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE);
+         properties.add(FTPTransfer.USE_NATURAL_ORDERING);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     protected FileTransfer getFileTransfer(final ProcessContext context) {
+         return new FTPTransfer(context, getLogger());
+     }
+ }