You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/26 15:16:44 UTC
[28/47] incubator-nifi git commit: NIFI-6: Rebase from develop to
include renaming of directory structure
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java
index 0000000,a1fc86d..4827ee3
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java
@@@ -1,0 -1,429 +1,429 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import static javax.xml.xpath.XPathConstants.NODESET;
+ import static javax.xml.xpath.XPathConstants.STRING;
+
+ import java.io.ByteArrayOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.io.UnsupportedEncodingException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+
+ import javax.xml.namespace.QName;
+ import javax.xml.transform.ErrorListener;
+ import javax.xml.transform.OutputKeys;
+ import javax.xml.transform.Source;
+ import javax.xml.transform.Transformer;
+ import javax.xml.transform.TransformerException;
+ import javax.xml.transform.TransformerFactory;
+ import javax.xml.transform.TransformerFactoryConfigurationError;
+ import javax.xml.transform.stream.StreamResult;
+ import javax.xml.xpath.XPathExpression;
+ import javax.xml.xpath.XPathExpressionException;
+ import javax.xml.xpath.XPathFactory;
+ import javax.xml.xpath.XPathFactoryConfigurationException;
+
+ import net.sf.saxon.lib.NamespaceConstant;
+ import net.sf.saxon.xpath.XPathEvaluator;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.EventDriven;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.SideEffectFree;
-import org.apache.nifi.processor.annotation.SupportsBatching;
-import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.util.ObjectHolder;
+ import org.xml.sax.InputSource;
+
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"XML", "evaluate", "XPath"})
+ @CapabilityDescription("Evaluates one or more XPaths against the content of a FlowFile. The results of those XPaths are assigned to "
+ + "FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of the "
+ + "Processor. XPaths are entered by adding user-defined properties; the name of the property maps to the Attribute "
+ + "Name into which the result will be placed (if the Destination is flowfile-content; otherwise, the property name is ignored). "
+ + "The value of the property must be a valid XPath expression. If the XPath evaluates to more than one node and the Return Type is "
+ + "set to 'nodeset' (either directly, or via 'auto-detect' with a Destination of "
+ + "'flowfile-content', the FlowFile will be unmodified and will be routed to failure. If the XPath does not "
+ + "evaluate to a Node, the FlowFile will be routed to 'unmatched' without having its contents modified. If Destination is "
+ + "flowfile-attribute and the expression matches nothing, attributes will be created with empty strings as the value, and the "
+ + "FlowFile will always be routed to 'matched'")
+ public class EvaluateXPath extends AbstractProcessor {
+
+ public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
+ public static final String DESTINATION_CONTENT = "flowfile-content";
+ public static final String RETURN_TYPE_AUTO = "auto-detect";
+ public static final String RETURN_TYPE_NODESET = "nodeset";
+ public static final String RETURN_TYPE_STRING = "string";
+
+ public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+ .name("Destination")
+ .description("Indicates whether the results of the XPath evaluation are written to the FlowFile content or a FlowFile attribute; if using attribute, must specify the Attribute Name property. If set to flowfile-content, only one XPath may be specified, and the property name is ignored.")
+ .required(true)
+ .allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTE)
+ .defaultValue(DESTINATION_CONTENT)
+ .build();
+
+ public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor.Builder()
+ .name("Return Type")
+ .description("Indicates the desired return type of the Xpath expressions. Selecting 'auto-detect' will set the return type to 'nodeset' for a Destination of 'flowfile-content', and 'string' for a Destination of 'flowfile-attribute'.")
+ .required(true)
+ .allowableValues(RETURN_TYPE_AUTO, RETURN_TYPE_NODESET, RETURN_TYPE_STRING)
+ .defaultValue(RETURN_TYPE_AUTO)
+ .build();
+
+ public static final Relationship REL_MATCH = new Relationship.Builder().name("matched").description("FlowFiles are routed to this relationship when the XPath is successfully evaluated and the FlowFile is modified as a result").build();
+ public static final Relationship REL_NO_MATCH = new Relationship.Builder().name("unmatched").description("FlowFiles are routed to this relationship when the XPath does not match the content of the FlowFile and the Destination is set to flowfile-content").build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles are routed to this relationship when the XPath cannot be evaluated against the content of the FlowFile; for instance, if the FlowFile is not valid XML, or if the Return Type is 'nodeset' and the XPath evaluates to multiple nodes").build();
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> properties;
+
+ private final AtomicReference<XPathFactory> factoryRef = new AtomicReference<>();
+
+ static {
+ System.setProperty("javax.xml.xpath.XPathFactory:" + NamespaceConstant.OBJECT_MODEL_SAXON,
+ "net.sf.saxon.xpath.XPathFactoryImpl");
+ }
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_MATCH);
+ relationships.add(REL_NO_MATCH);
+ relationships.add(REL_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(DESTINATION);
+ properties.add(RETURN_TYPE);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+ final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
+
+ final String destination = context.getProperty(DESTINATION).getValue();
+ if (DESTINATION_CONTENT.equals(destination)) {
+ int xpathCount = 0;
+
+ for (final PropertyDescriptor desc : context.getProperties().keySet()) {
+ if (desc.isDynamic()) {
+ xpathCount++;
+ }
+ }
+
+ if (xpathCount != 1) {
+ results.add(new ValidationResult.Builder().subject("XPaths").valid(false).explanation("Exactly one XPath must be set if using destination of " + DESTINATION_CONTENT).build());
+ }
+ }
+
+ return results;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @OnScheduled
+ public void initializeXPathFactory() throws XPathFactoryConfigurationException {
+ factoryRef.set(XPathFactory.newInstance(NamespaceConstant.OBJECT_MODEL_SAXON));
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .expressionLanguageSupported(false)
+ .addValidator(new XPathValidator())
+ .required(false)
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ final List<FlowFile> flowFiles = session.get(50);
+ if (flowFiles.isEmpty()) {
+ return;
+ }
+
+ final ProcessorLog logger = getLogger();
+ final XPathFactory factory = factoryRef.get();
+ final XPathEvaluator xpathEvaluator = (XPathEvaluator) factory.newXPath();
+ final Map<String, XPathExpression> attributeToXPathMap = new HashMap<>();
+
+ for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+ if (!entry.getKey().isDynamic()) {
+ continue;
+ }
+ final XPathExpression xpathExpression;
+ try {
+ xpathExpression = xpathEvaluator.compile(entry.getValue());
+ attributeToXPathMap.put(entry.getKey().getName(), xpathExpression);
+ } catch (XPathExpressionException e) {
+ throw new ProcessException(e); // should not happen because we've already validated the XPath (in XPathValidator)
+ }
+ }
+
+ final XPathExpression slashExpression;
+ try {
+ slashExpression = xpathEvaluator.compile("/");
+ } catch (XPathExpressionException e) {
+ logger.error("unable to compile XPath expression due to {}", new Object[]{e});
+ session.transfer(flowFiles, REL_FAILURE);
+ return;
+ }
+
+ final String destination = context.getProperty(DESTINATION).getValue();
+ final QName returnType;
+
+ switch (context.getProperty(RETURN_TYPE).getValue()) {
+ case RETURN_TYPE_AUTO:
+ if (DESTINATION_ATTRIBUTE.equals(destination)) {
+ returnType = STRING;
+ } else if (DESTINATION_CONTENT.equals(destination)) {
+ returnType = NODESET;
+ } else {
+ throw new IllegalStateException("The only possible destinations should be CONTENT or ATTRIBUTE...");
+ }
+ break;
+ case RETURN_TYPE_NODESET:
+ returnType = NODESET;
+ break;
+ case RETURN_TYPE_STRING:
+ returnType = STRING;
+ break;
+ default:
+ throw new IllegalStateException("There are no other return types...");
+ }
+
+ flowFileLoop:
+ for (FlowFile flowFile : flowFiles) {
+ final ObjectHolder<Throwable> error = new ObjectHolder<>(null);
+ final ObjectHolder<Source> sourceRef = new ObjectHolder<>(null);
+
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws IOException {
+ try (final InputStream in = new BufferedInputStream(rawIn)) {
+ final List<Source> rootList = (List<Source>) slashExpression.evaluate(new InputSource(in), NODESET);
+ sourceRef.set(rootList.get(0));
+ } catch (final Exception e) {
+ error.set(e);
+ }
+ }
+ });
+
+ if (error.get() != null) {
+ logger.error("unable to evaluate XPath against {} due to {}; routing to 'failure'", new Object[]{flowFile, error.get()});
+ session.transfer(flowFile, REL_FAILURE);
+ continue;
+ }
+
+ final Map<String, String> xpathResults = new HashMap<>();
+
+ for (final Map.Entry<String, XPathExpression> entry : attributeToXPathMap.entrySet()) {
+ Object result = null;
+ try {
+ result = entry.getValue().evaluate(sourceRef.get(), returnType);
+ if (result == null) {
+ continue;
+ }
+ } catch (final XPathExpressionException e) {
+ logger.error("failed to evaluate XPath for {} for Property {} due to {}; routing to failure", new Object[]{flowFile, entry.getKey(), e});
+ session.transfer(flowFile, REL_FAILURE);
+ continue flowFileLoop;
+ }
+
+ if (returnType == NODESET) {
+ List<Source> nodeList = (List<Source>) result;
+ if (nodeList.isEmpty()) {
+ logger.info("Routing {} to 'unmatched'", new Object[]{flowFile});
+ session.transfer(flowFile, REL_NO_MATCH);
+ continue flowFileLoop;
+ } else if (nodeList.size() > 1) {
+ logger.error("Routing {} to 'failure' because the XPath evaluated to {} XML nodes", new Object[]{
+ flowFile, nodeList.size()});
+ session.transfer(flowFile, REL_FAILURE);
+ continue flowFileLoop;
+ }
+ final Source sourceNode = nodeList.get(0);
+
+ if (DESTINATION_ATTRIBUTE.equals(destination)) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ doTransform(sourceNode, baos);
+ xpathResults.put(entry.getKey(), baos.toString("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new ProcessException(e); // this REALLY shouldn't happen
+ } catch (TransformerException e) {
+ error.set(e);
+ }
+
+ } else if (DESTINATION_CONTENT.equals(destination)) {
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream rawOut) throws IOException {
+ try (final OutputStream out = new BufferedOutputStream(rawOut)) {
+ doTransform(sourceNode, out);
+ } catch (TransformerException e) {
+ error.set(e);
+ }
+ }
+ });
+ }
+
+ } else if (returnType == STRING) {
+ final String resultString = (String) result;
+
+ if (DESTINATION_ATTRIBUTE.equals(destination)) {
+ xpathResults.put(entry.getKey(), resultString);
+ } else if (DESTINATION_CONTENT.equals(destination)) {
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream rawOut) throws IOException {
+ try (final OutputStream out = new BufferedOutputStream(rawOut)) {
+ out.write(resultString.getBytes("UTF-8"));
+ }
+ }
+ });
+ }
+ }
+ }
+
+ if (error.get() == null) {
+ if (DESTINATION_ATTRIBUTE.equals(destination)) {
+ flowFile = session.putAllAttributes(flowFile, xpathResults);
+ final Relationship destRel = xpathResults.isEmpty() ? REL_NO_MATCH : REL_MATCH;
+ logger.info("Successfully evaluated XPaths against {} and found {} matches; routing to {}", new Object[]{flowFile,
+ xpathResults.size(), destRel.getName()});
+ session.transfer(flowFile, destRel);
+ session.getProvenanceReporter().modifyAttributes(flowFile);
+ } else if (DESTINATION_CONTENT.equals(destination)) {
+ logger.info("Successfully updated content for {}; routing to 'matched'", new Object[]{flowFile});
+ session.transfer(flowFile, REL_MATCH);
+ session.getProvenanceReporter().modifyContent(flowFile);
+ }
+ } else {
+ logger.error("Failed to write XPath result for {} due to {}; routing original to 'failure'", new Object[]{flowFile, error.get()});
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+ }
+
+ private void doTransform(final Source sourceNode, OutputStream out) throws TransformerFactoryConfigurationError, TransformerException {
+ final Transformer transformer;
+ try {
+ transformer = TransformerFactory.newInstance().newTransformer();
+ } catch (final Exception e) {
+ throw new ProcessException(e);
+ }
+
+ final Properties props = new Properties();
+ props.setProperty(OutputKeys.METHOD, "xml");
+ props.setProperty(OutputKeys.INDENT, "no");
+ props.setProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+ transformer.setOutputProperties(props);
+
+ final ProcessorLog logger = getLogger();
+
+ final ObjectHolder<TransformerException> error = new ObjectHolder<>(null);
+ transformer.setErrorListener(new ErrorListener() {
+ @Override
+ public void warning(final TransformerException exception) throws TransformerException {
+ logger.warn("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception});
+ }
+
+ @Override
+ public void error(final TransformerException exception) throws TransformerException {
+ logger.error("Encountered error from XPath Engine: ", new Object[] {exception.toString(), exception});
+ error.set(exception);
+ }
+
+ @Override
+ public void fatalError(final TransformerException exception) throws TransformerException {
+ logger.error("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception});
+ error.set(exception);
+ }
+ });
+
+ transformer.transform(sourceNode, new StreamResult(out));
+ if ( error.get() != null ) {
+ throw error.get();
+ }
+ }
+
+ private static class XPathValidator implements Validator {
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) {
+ try {
+ XPathFactory factory = XPathFactory.newInstance(NamespaceConstant.OBJECT_MODEL_SAXON);
+ final XPathEvaluator evaluator = (XPathEvaluator) factory.newXPath();
+
+ String error = null;
+ try {
+ evaluator.compile(input);
+ } catch (final Exception e) {
+ error = e.toString();
+ }
+
+ return new ValidationResult.Builder().input(input).subject(subject).valid(error == null).explanation(error).build();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Unable to initialize XPath engine due to " + e.toString()).build();
+ }
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java
index 0000000,8b4ce09..3ddee83
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXQuery.java
@@@ -1,0 -1,463 +1,463 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.io.ByteArrayOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ import java.util.Set;
+
+ import javax.xml.parsers.DocumentBuilderFactory;
+ import javax.xml.transform.OutputKeys;
+ import javax.xml.transform.Transformer;
+ import javax.xml.transform.TransformerConfigurationException;
+ import javax.xml.transform.TransformerException;
+ import javax.xml.transform.TransformerFactory;
+ import javax.xml.transform.TransformerFactoryConfigurationError;
+ import javax.xml.transform.sax.SAXSource;
+ import javax.xml.transform.stream.StreamResult;
+
+ import net.sf.saxon.s9api.DOMDestination;
+ import net.sf.saxon.s9api.Processor;
+ import net.sf.saxon.s9api.SaxonApiException;
+ import net.sf.saxon.s9api.XQueryCompiler;
+ import net.sf.saxon.s9api.XQueryEvaluator;
+ import net.sf.saxon.s9api.XQueryExecutable;
+ import net.sf.saxon.s9api.XdmItem;
+ import net.sf.saxon.s9api.XdmNode;
+ import net.sf.saxon.s9api.XdmValue;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.EventDriven;
-import org.apache.nifi.processor.annotation.SideEffectFree;
-import org.apache.nifi.processor.annotation.SupportsBatching;
-import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.ObjectHolder;
+
+ import org.w3c.dom.Document;
+ import org.xml.sax.InputSource;
+
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"XML", "evaluate", "XPath", "XQuery", "experimental"})
+ @CapabilityDescription(
+ "Evaluates one or more XQueries against the content of a FlowFile. The results of those XQueries are assigned "
+ + "to FlowFile Attributes or are written to the content of the FlowFile itself, depending on configuration of "
+ + "the Processor. XQueries are entered by adding user-defined properties; the name of the property maps to the "
+ + "Attribute Name into which the result will be placed (if the Destination is 'flowfile-attribute'; otherwise, "
+ + "the property name is ignored). The value of the property must be a valid XQuery. If the XQuery returns more "
+ + "than one result, new attributes or FlowFiles (for Destinations of 'flowfile-attribute' or 'flowfile-content' "
+ + "respectively) will be created for each result (attributes will have a '.n' one-up number appended to the "
+ + "specified attribute name). If any provided XQuery returns a result, the FlowFile(s) will be routed to "
+ + "'matched'. If no provided XQuery returns a result, the FlowFile will be routed to 'unmatched'. If the "
+ + "Destination is 'flowfile-attribute' and the XQueries matche nothing, no attributes will be applied to the "
+ + "FlowFile.")
+
+ public class EvaluateXQuery extends AbstractProcessor {
+
+ public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
+ public static final String DESTINATION_CONTENT = "flowfile-content";
+
+ public static final String OUTPUT_METHOD_XML = "xml";
+ public static final String OUTPUT_METHOD_HTML = "html";
+ public static final String OUTPUT_METHOD_TEXT = "text";
+
+ public static final String UTF8 = "UTF-8";
+
+ public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
+ .name("Destination")
+ .description(
+ "Indicates whether the results of the XQuery evaluation are written to the FlowFile content or a "
+ + "FlowFile attribute. If set to <flowfile-content>, only one XQuery may be specified and the property "
+ + "name is ignored. If set to <flowfile-attribute> and the XQuery returns more than one result, "
+ + "multiple attributes will be added to theFlowFile, each named with a '.n' one-up number appended to "
+ + "the specified attribute name")
+ .required(true)
+ .allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTE)
+ .defaultValue(DESTINATION_CONTENT)
+ .build();
+
+ public static final PropertyDescriptor XML_OUTPUT_METHOD = new PropertyDescriptor.Builder()
+ .name("Output: Method")
+ .description("Identifies the overall method that should be used for outputting a result tree.")
+ .required(true)
+ .allowableValues(OUTPUT_METHOD_XML, OUTPUT_METHOD_HTML, OUTPUT_METHOD_TEXT)
+ .defaultValue(OUTPUT_METHOD_XML)
+ .build();
+
+ public static final PropertyDescriptor XML_OUTPUT_OMIT_XML_DECLARATION = new PropertyDescriptor.Builder()
+ .name("Output: Omit XML Declaration")
+ .description("Specifies whether the processor should output an XML declaration when transforming a result tree.")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("false")
+ .build();
+
+ public static final PropertyDescriptor XML_OUTPUT_INDENT = new PropertyDescriptor.Builder()
+ .name("Output: Indent")
+ .description("Specifies whether the processor may add additional whitespace when outputting a result tree.")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("false")
+ .build();
+
+ public static final Relationship REL_MATCH = new Relationship.Builder()
+ .name("matched")
+ .description(
+ "FlowFiles are routed to this relationship when the XQuery is successfully evaluated and the FlowFile "
+ + "is modified as a result")
+ .build();
+
+ public static final Relationship REL_NO_MATCH = new Relationship.Builder()
+ .name("unmatched")
+ .description(
+ "FlowFiles are routed to this relationship when the XQuery does not match the content of the FlowFile "
+ + "and the Destination is set to flowfile-content")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description(
+ "FlowFiles are routed to this relationship when the XQuery cannot be evaluated against the content of "
+ + "the FlowFile.")
+ .build();
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> properties;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_MATCH);
+ relationships.add(REL_NO_MATCH);
+ relationships.add(REL_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(DESTINATION);
+ properties.add(XML_OUTPUT_METHOD);
+ properties.add(XML_OUTPUT_OMIT_XML_DECLARATION);
+ properties.add(XML_OUTPUT_INDENT);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+ final List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
+
+ final String destination = context.getProperty(DESTINATION).getValue();
+ if (DESTINATION_CONTENT.equals(destination)) {
+ int xQueryCount = 0;
+ for (final PropertyDescriptor desc : context.getProperties().keySet()) {
+ if (desc.isDynamic()) {
+ xQueryCount++;
+ }
+ }
+ if (xQueryCount != 1) {
+ results.add(new ValidationResult.Builder()
+ .subject("XQueries")
+ .valid(false)
+ .explanation("Exactly one XQuery must be set if using destination of " + DESTINATION_CONTENT)
+ .build());
+ }
+ }
+ return results;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .expressionLanguageSupported(false)
+ .addValidator(new XQueryValidator())
+ .required(false)
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ final List<FlowFile> flowFileBatch = session.get(50);
+ if (flowFileBatch.isEmpty()) {
+ return;
+ }
+ final ProcessorLog logger = getLogger();
+ final Map<String, XQueryExecutable> attributeToXQueryMap = new HashMap<>();
+
+ final Processor proc = new Processor(false);
+ final XQueryCompiler comp = proc.newXQueryCompiler();
+
+ for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+ if (!entry.getKey().isDynamic()) {
+ continue;
+ }
+ final XQueryExecutable exp;
+ try {
+ exp = comp.compile(entry.getValue());
+ attributeToXQueryMap.put(entry.getKey().getName(), exp);
+ } catch (SaxonApiException e) {
+ throw new ProcessException(e); // should not happen because we've already validated the XQuery (in XQueryValidator)
+ }
+ }
+
+ final XQueryExecutable slashExpression;
+ try {
+ slashExpression = comp.compile("/");
+ } catch (SaxonApiException e) {
+ logger.error("unable to compile XQuery expression due to {}", new Object[]{e});
+ session.transfer(flowFileBatch, REL_FAILURE);
+ return;
+ }
+
+ final String destination = context.getProperty(DESTINATION).getValue();
+
+ flowFileLoop:
+ for (FlowFile flowFile : flowFileBatch) {
+ if (!isScheduled()) {
+ session.rollback();
+ return;
+ }
+
+ final ObjectHolder<Throwable> error = new ObjectHolder<>(null);
+ final ObjectHolder<XdmNode> sourceRef = new ObjectHolder<>(null);
+
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws IOException {
+ try (final InputStream in = new BufferedInputStream(rawIn)) {
+ XQueryEvaluator qe = slashExpression.load();
+ qe.setSource(new SAXSource(new InputSource(in)));
+ DocumentBuilderFactory dfactory = DocumentBuilderFactory.newInstance();
+ dfactory.setNamespaceAware(true);
+ Document dom = dfactory.newDocumentBuilder().newDocument();
+ qe.run(new DOMDestination(dom));
+ XdmNode rootNode = proc.newDocumentBuilder().wrap(dom);
+ sourceRef.set(rootNode);
+ } catch (final Exception e) {
+ error.set(e);
+ }
+ }
+ });
+
+ if (error.get() != null) {
+ logger.error("unable to evaluate XQuery against {} due to {}; routing to 'failure'",
+ new Object[]{flowFile, error.get()});
+ session.transfer(flowFile, REL_FAILURE);
+ continue;
+ }
+
+ final Map<String, String> xQueryResults = new HashMap<>();
+ List<FlowFile> childrenFlowFiles = new ArrayList<>();
+
+ for (final Map.Entry<String, XQueryExecutable> entry : attributeToXQueryMap.entrySet()) {
+ try {
+ XQueryEvaluator qe = entry.getValue().load();
+ qe.setContextItem(sourceRef.get());
+ XdmValue result = qe.evaluate();
+
+ if (DESTINATION_ATTRIBUTE.equals(destination)) {
+ int index = 1;
+ for (XdmItem item : result) {
+ String value = formatItem(item, context);
+ String attributeName = entry.getKey();
+ if (result.size() > 1) {
+ attributeName += "." + index++;
+ }
+ xQueryResults.put(attributeName, value);
+ }
+ } else { // if (DESTINATION_CONTENT.equals(destination)){
+ if (result.size() == 0) {
+ logger.info("Routing {} to 'unmatched'", new Object[]{flowFile});
+ session.transfer(flowFile, REL_NO_MATCH);
+ continue flowFileLoop;
+ } else if (result.size() == 1) {
+ final XdmItem item = result.itemAt(0);
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream rawOut) throws IOException {
+ try (final OutputStream out = new BufferedOutputStream(rawOut)) {
+ writeformattedItem(item, context, out);
+ } catch (TransformerFactoryConfigurationError | TransformerException e) {
+ throw new IOException(e);
+ }
+ }
+ });
+ } else {
+ for (final XdmItem item : result) {
+ FlowFile ff = session.clone(flowFile);
+ ff = session.write(ff, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream rawOut) throws IOException {
+ try (final OutputStream out = new BufferedOutputStream(rawOut)) {
+ try {
+ writeformattedItem(item, context, out);
+ } catch (TransformerFactoryConfigurationError | TransformerException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ });
+ childrenFlowFiles.add(ff);
+ }
+ }
+ }
+ } catch (final SaxonApiException e) {
+ logger.error("failed to evaluate XQuery for {} for Property {} due to {}; routing to failure",
+ new Object[]{flowFile, entry.getKey(), e});
+ session.transfer(flowFile, REL_FAILURE);
+ session.remove(childrenFlowFiles);
+ continue flowFileLoop;
+ } catch (TransformerFactoryConfigurationError | TransformerException | IOException e) {
+ logger.error("Failed to write XQuery result for {} due to {}; routing original to 'failure'", new Object[]{
+ flowFile, error.get()});
+ session.transfer(flowFile, REL_FAILURE);
+ session.remove(childrenFlowFiles);
+ continue flowFileLoop;
+ }
+ }
+
+ if (DESTINATION_ATTRIBUTE.equals(destination)) {
+ flowFile = session.putAllAttributes(flowFile, xQueryResults);
+ final Relationship destRel = xQueryResults.isEmpty() ? REL_NO_MATCH : REL_MATCH;
+ logger.info("Successfully evaluated XQueries against {} and found {} matches; routing to {}",
+ new Object[]{flowFile, xQueryResults.size(), destRel.getName()});
+ session.transfer(flowFile, destRel);
+ session.getProvenanceReporter().modifyAttributes(flowFile);
+ } else { // if (DESTINATION_CONTENT.equals(destination)) {
+ if (!childrenFlowFiles.isEmpty()) {
+ logger.info("Successfully created {} new FlowFiles from {}; routing all to 'matched'",
+ new Object[]{childrenFlowFiles.size(), flowFile});
+ session.transfer(childrenFlowFiles, REL_MATCH);
+ session.remove(flowFile);
+ } else {
+ logger.info("Successfully updated content for {}; routing to 'matched'", new Object[]{flowFile});
+ session.transfer(flowFile, REL_MATCH);
+ session.getProvenanceReporter().modifyContent(flowFile);
+ }
+ }
+ } // end flowFileLoop
+ }
+
+ private String formatItem(XdmItem item, ProcessContext context) throws TransformerConfigurationException,
+ TransformerFactoryConfigurationError, TransformerException, IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ writeformattedItem(item, context, baos);
+ return baos.toString();
+ }
+
+ void writeformattedItem(XdmItem item, ProcessContext context, OutputStream out) throws TransformerConfigurationException,
+ TransformerFactoryConfigurationError, TransformerException, IOException {
+
+ if (item.isAtomicValue()) {
+ out.write(item.getStringValue().getBytes(UTF8));
+ } else { // item is an XdmNode
+ XdmNode node = (XdmNode) item;
+ switch (node.getNodeKind()) {
+ case DOCUMENT:
+ case ELEMENT:
+ Transformer transformer = TransformerFactory.newInstance().newTransformer();
+ final Properties props = getTransformerProperties(context);
+ transformer.setOutputProperties(props);
+ transformer.transform(node.asSource(), new StreamResult(out));
+ break;
+ default:
+ out.write(node.getStringValue().getBytes(UTF8));
+ }
+ }
+ }
+
+ private Properties getTransformerProperties(ProcessContext context) {
+ final String method = context.getProperty(XML_OUTPUT_METHOD).getValue();
+ boolean indent = context.getProperty(XML_OUTPUT_INDENT).asBoolean();
+ boolean omitDeclaration = context.getProperty(XML_OUTPUT_OMIT_XML_DECLARATION).asBoolean();
+ return getTransformerProperties(method, indent, omitDeclaration);
+ }
+
+ static Properties getTransformerProperties(final String method, final boolean indent, final boolean omitDeclaration) {
+ final Properties props = new Properties();
+ props.setProperty(OutputKeys.METHOD, method);
+ props.setProperty(OutputKeys.INDENT, indent ? "yes" : "no");
+ props.setProperty(OutputKeys.OMIT_XML_DECLARATION, omitDeclaration ? "yes" : "no");
+ return props;
+ }
+
+ private static class XQueryValidator implements Validator {
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) {
+ try {
+ final Processor proc = new Processor(false);
+ final XQueryCompiler comp = proc.newXQueryCompiler();
+ String error = null;
+ try {
+ comp.compile(input);
+ } catch (final Exception e) {
+ error = e.toString();
+ }
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(error == null)
+ .explanation(error)
+ .build();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(false)
+ .explanation("Unable to initialize XQuery engine due to " + e.toString())
+ .build();
+ }
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index 0000000,ab0b2aa..dda3647
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@@ -1,0 -1,358 +1,358 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.io.BufferedReader;
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.InputStreamReader;
+ import java.io.OutputStream;
+ import java.lang.ProcessBuilder.Redirect;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.expression.AttributeExpression.ResultType;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.EventDriven;
-import org.apache.nifi.processor.annotation.SupportsBatching;
-import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+
+ import org.apache.commons.io.IOUtils;
+ import org.apache.commons.lang3.StringUtils;
+
+ /**
+ * <p>
+ * This processor executes an external command on the contents of a flow file,
+ * and creates a new flow file with the results of the command.
+ * </p>
+ * <p>
+ * <strong>Properties:</strong>
+ * </p>
+ * <ul>
+ * <li><strong>Command Path</strong>
+ * <ul>
+ * <li>Specifies the command to be executed; if just the name of an executable
+ * is provided, it must be in the user's environment PATH.</li>
+ * <li>Default value: none</li>
+ * <li>Supports expression language: true</li>
+ * </ul>
+ * </li>
+ * <li>Command Arguments
+ * <ul>
+ * <li>The arguments to supply to the executable delimited by the ';' character.
+ * Each argument may be an Expression Language statement.</li>
+ * <li>Default value: none</li>
+ * <li>Supports expression language: true</li>
+ * </ul>
+ * </li>
+ * <li>Working Directory
+ * <ul>
+ * <li>The directory to use as the current working directory when executing the
+ * command</li>
+ * <li>Default value: none (which means whatever NiFi's current working
+ * directory is...probably the root of the NiFi installation directory.)</li>
+ * <li>Supports expression language: true</li>
+ * </ul>
+ * </li>
+ *
+ * </ul>
+ *
+ * <p>
+ * <strong>Relationships:</strong>
+ * </p>
+ * <ul>
+ * <li>original
+ * <ul>
+ * <li>The destination path for the original incoming flow file</li>
+ * </ul>
+ * </li>
+ * <li>output-stream
+ * <ul>
+ * <li>The destination path for the flow file created from the command's
+ * output</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p>
+ *
+ * @author unattributed
+ */
+ @EventDriven
+ @SupportsBatching
+ @Tags({"command execution", "command", "stream", "execute"})
+ @CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
+ public class ExecuteStreamCommand extends AbstractProcessor {
+
+ public static final Relationship ORIGINAL_RELATIONSHIP = new Relationship.Builder()
+ .name("original")
+ .description("FlowFiles that were successfully processed")
+ .build();
+ public static final Relationship OUTPUT_STREAM_RELATIONSHIP = new Relationship.Builder()
+ .name("output stream")
+ .description("The destination path for the flow file created from the command's output")
+ .build();
+ private static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ Set<Relationship> rels = new HashSet<>();
+ rels.add(OUTPUT_STREAM_RELATIONSHIP);
+ rels.add(ORIGINAL_RELATIONSHIP);
+ RELATIONSHIPS = Collections.unmodifiableSet(rels);
+ }
+
+ private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator(
+ ResultType.STRING, true);
+ static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder()
+ .name("Command Path")
+ .description(
+ "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
+ .expressionLanguageSupported(true)
+ .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor EXECUTION_ARGUMENTS = new PropertyDescriptor.Builder()
+ .name("Command Arguments")
+ .description("The arguments to supply to the executable delimited by the ';' character.")
+ .expressionLanguageSupported(true)
+ .addValidator(new Validator() {
+
+ @Override
+ public ValidationResult validate(String subject, String input, ValidationContext context) {
+ ValidationResult result = new ValidationResult.Builder()
+ .subject(subject)
+ .valid(true)
+ .input(input)
+ .build();
+ String[] args = input.split(";");
+ for (String arg : args) {
+ ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context);
+ if (!valResult.isValid()) {
+ result = valResult;
+ break;
+ }
+ }
+ return result;
+ }
+ })
+ .build();
+
+ static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
+ .name("Working Directory")
+ .description("The directory to use as the current working directory when executing the command")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
+ .required(false)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES;
+
+ static {
+ List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(EXECUTION_ARGUMENTS);
+ props.add(EXECUTION_COMMAND);
+ props.add(WORKING_DIR);
+ PROPERTIES = Collections.unmodifiableList(props);
+ }
+
+ private ProcessorLog logger;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ protected void init(ProcessorInitializationContext context) {
+ logger = getLogger();
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (null == flowFile) {
+ return;
+ }
+
+ final ArrayList<String> args = new ArrayList<>();
+ final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue();
+ args.add(executeCommand);
+ final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue();
+ if (!StringUtils.isBlank(commandArguments)) {
+ for (String arg : commandArguments.split(";")) {
+ args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue());
+ }
+ }
+ final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(flowFile).getValue();
+
+ final ProcessBuilder builder = new ProcessBuilder();
+
+ logger.debug("Executing and waiting for command {} with arguments {}", new Object[]{executeCommand, commandArguments});
+ File dir = null;
+ if (!StringUtils.isBlank(workingDir)) {
+ dir = new File(workingDir);
+ if (!dir.exists() && !dir.mkdirs()) {
+ logger.warn("Failed to create working directory {}, using current working directory {}",
+ new Object[]{workingDir, System.getProperty("user.dir")});
+ }
+ }
+ builder.command(args);
+ builder.directory(dir);
+ builder.redirectInput(Redirect.PIPE);
+ builder.redirectOutput(Redirect.PIPE);
+ final Process process;
+ try {
+ process = builder.start();
+ } catch (IOException e) {
+ logger.error("Could not create external process to run command", e);
+ throw new ProcessException(e);
+ }
+ try (final OutputStream pos = process.getOutputStream();
+ final InputStream pis = process.getInputStream();
+ final InputStream pes = process.getErrorStream();
+ final BufferedInputStream bis = new BufferedInputStream(pis);
+ final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) {
+ int exitCode = -1;
+ final BufferedOutputStream bos = new BufferedOutputStream(pos);
+ FlowFile outputStreamFlowFile = session.create(flowFile);
+ StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process);
+ session.read(flowFile, callback);
+ outputStreamFlowFile = callback.outputStreamFlowFile;
+ exitCode = callback.exitCode;
+ logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode});
+
+ Map<String, String> attributes = new HashMap<>();
+
+ final StringBuilder strBldr = new StringBuilder();
+ try {
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ strBldr.append(line).append("\n");
+ }
+ } catch (IOException e) {
+ strBldr.append("Unknown...could not read Process's Std Error");
+ }
+ int length = strBldr.length() > 4000 ? 4000 : strBldr.length();
+ attributes.put("execution.error", strBldr.substring(0, length));
+
+ if (exitCode == 0) {
+ logger.info("Transferring flow file {} to 'output stream'", new Object[]{outputStreamFlowFile});
+ } else {
+ logger.error("Transferring flow file {} to 'output stream'. Executable command {} ended in an error: {}",
+ new Object[]{outputStreamFlowFile, executeCommand, strBldr.toString()});
+ }
+
+ attributes.put("execution.status", Integer.toString(exitCode));
+ attributes.put("execution.command", executeCommand);
+ attributes.put("execution.command.args", commandArguments);
+ outputStreamFlowFile = session.putAllAttributes(outputStreamFlowFile, attributes);
+ session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP);
+ logger.info("Transferring flow file {} to original", new Object[]{flowFile});
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, ORIGINAL_RELATIONSHIP);
+
+ } catch (final IOException ex) {
+ // could not close Process related streams
+ logger.warn("Problem terminating Process {}", new Object[]{process}, ex);
+ } finally {
+ process.destroy(); // last ditch effort to clean up that process.
+ }
+ }
+
+ static class StdInWriterCallback implements InputStreamCallback {
+
+ final OutputStream stdInWritable;
+ final InputStream stdOutReadable;
+ final ProcessorLog logger;
+ final ProcessSession session;
+ final Process process;
+ FlowFile outputStreamFlowFile;
+ int exitCode;
+
+ public StdInWriterCallback(OutputStream stdInWritable, InputStream stdOutReadable, ProcessorLog logger, ProcessSession session,
+ FlowFile outputStreamFlowFile, Process process) {
+ this.stdInWritable = stdInWritable;
+ this.stdOutReadable = stdOutReadable;
+ this.logger = logger;
+ this.session = session;
+ this.outputStreamFlowFile = outputStreamFlowFile;
+ this.process = process;
+ }
+
+ @Override
+ public void process(final InputStream incomingFlowFileIS) throws IOException {
+ outputStreamFlowFile = session.write(outputStreamFlowFile, new OutputStreamCallback() {
+
+ @Override
+ public void process(OutputStream out) throws IOException {
+ Thread writerThread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ StreamUtils.copy(incomingFlowFileIS, stdInWritable);
+ } catch (IOException e) {
+ logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e);
+ }
+ // MUST close the output stream to the stdIn so that whatever is reading knows
+ // there is no more data
+ IOUtils.closeQuietly(stdInWritable);
+ }
+ });
+ writerThread.setDaemon(true);
+ writerThread.start();
+ StreamUtils.copy(stdOutReadable, out);
+ try {
+ exitCode = process.waitFor();
+ } catch (InterruptedException e) {
+ logger.warn("Command Execution Process was interrupted", e);
+ }
+ }
+ });
+ }
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
index 0000000,6f18a01..0ae4747
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
@@@ -1,0 -1,164 +1,164 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Random;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.SupportsBatching;
-import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+
+ @SupportsBatching
+ @Tags({"test", "random", "generate"})
+ @CapabilityDescription("This processor creates FlowFiles of random data and is used for load testing")
+ public class GenerateFlowFile extends AbstractProcessor {
+
+ private final AtomicReference<byte[]> data = new AtomicReference<>();
+
+ public static final String DATA_FORMAT_BINARY = "Binary";
+ public static final String DATA_FORMAT_TEXT = "Text";
+
+ public static final PropertyDescriptor FILE_SIZE = new PropertyDescriptor.Builder()
+ .name("File Size")
+ .description("The size of the file that will be used")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The number of FlowFiles to be transferred in each invocation")
+ .required(true)
+ .defaultValue("1")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor DATA_FORMAT = new PropertyDescriptor.Builder()
+ .name("Data Format")
+ .description("Specifies whether the data should be Text or Binary")
+ .required(true)
+ .defaultValue(DATA_FORMAT_BINARY)
+ .allowableValues(DATA_FORMAT_BINARY, DATA_FORMAT_TEXT)
+ .build();
+ public static final PropertyDescriptor UNIQUE_FLOWFILES = new PropertyDescriptor.Builder()
+ .name("Unique FlowFiles")
+ .description("If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles will get the same content but this offers much higher throughput")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
+
+ private List<PropertyDescriptor> descriptors;
+ private Set<Relationship> relationships;
+
+ private static final char[] TEXT_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t ".toCharArray();
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(FILE_SIZE);
+ descriptors.add(BATCH_SIZE);
+ descriptors.add(DATA_FORMAT);
+ descriptors.add(UNIQUE_FLOWFILES);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ if (context.getProperty(UNIQUE_FLOWFILES).asBoolean()) {
+ this.data.set(null);
+ } else {
+ this.data.set(generateData(context));
+ }
+
+ }
+
+ private byte[] generateData(final ProcessContext context) {
+ final int byteCount = context.getProperty(FILE_SIZE).asDataSize(DataUnit.B).intValue();
+
+ final Random random = new Random();
+ final byte[] array = new byte[byteCount];
+ if (context.getProperty(DATA_FORMAT).getValue().equals(DATA_FORMAT_BINARY)) {
+ random.nextBytes(array);
+ } else {
+ for (int i = 0; i < array.length; i++) {
+ final int index = random.nextInt(TEXT_CHARS.length);
+ array[i] = (byte) TEXT_CHARS[index];
+ }
+ }
+
+ return array;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ final byte[] data;
+ if (context.getProperty(UNIQUE_FLOWFILES).asBoolean()) {
+ data = generateData(context);
+ } else {
+ data = this.data.get();
+ }
+
+ for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) {
+ FlowFile flowFile = session.create();
+ if (data.length > 0) {
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ out.write(data);
+ }
+ });
+ }
+
+ session.getProvenanceReporter().create(flowFile);
+ session.transfer(flowFile, SUCCESS);
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
index 0000000,854ee37..2dabbc6
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
@@@ -1,0 -1,72 +1,72 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processors.standard;
+
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.annotation.CapabilityDescription;
-import org.apache.nifi.processor.annotation.SideEffectFree;
-import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processors.standard.util.FTPTransfer;
+ import org.apache.nifi.processors.standard.util.FileTransfer;
+
+ @SideEffectFree
+ @Tags({"FTP", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
+ @CapabilityDescription("Fetches files from an FTP Server and creates FlowFiles from them")
+ public class GetFTP extends GetFileTransfer {
+
+ private List<PropertyDescriptor> properties;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(FTPTransfer.HOSTNAME);
+ properties.add(FTPTransfer.PORT);
+ properties.add(FTPTransfer.USERNAME);
+ properties.add(FTPTransfer.PASSWORD);
+ properties.add(FTPTransfer.CONNECTION_MODE);
+ properties.add(FTPTransfer.TRANSFER_MODE);
+ properties.add(FTPTransfer.REMOTE_PATH);
+ properties.add(FTPTransfer.FILE_FILTER_REGEX);
+ properties.add(FTPTransfer.PATH_FILTER_REGEX);
+ properties.add(FTPTransfer.POLLING_INTERVAL);
+ properties.add(FTPTransfer.RECURSIVE_SEARCH);
+ properties.add(FTPTransfer.IGNORE_DOTTED_FILES);
+ properties.add(FTPTransfer.DELETE_ORIGINAL);
+ properties.add(FTPTransfer.CONNECTION_TIMEOUT);
+ properties.add(FTPTransfer.DATA_TIMEOUT);
+ properties.add(FTPTransfer.MAX_SELECTS);
+ properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE);
+ properties.add(FTPTransfer.USE_NATURAL_ORDERING);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected FileTransfer getFileTransfer(final ProcessContext context) {
+ return new FTPTransfer(context, getLogger());
+ }
+ }