You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/26 15:19:45 UTC

[21/48] incubator-nifi git commit: NIFI-6: Rebase from develop to include renaming of directory structure

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
index 0000000,be4aed6..1fe78af
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
@@@ -1,0 -1,261 +1,261 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import org.apache.nifi.components.AllowableValue;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.expression.AttributeExpression.ResultType;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ /**
+  * <p>
+  * This processor routes a FlowFile based on its flow file attributes by using
+  * the Attribute Expression Language. The Expression Language is used by adding
+  * Optional Properties to the processor. The name of the Property indicates the
+  * name of the relationship to which a FlowFile will be routed if matched. The
+  * value of the Property indicates an Attribute Expression Language Expression
+  * that will be used to determine whether or not a given FlowFile will be routed
+  * to the associated relationship. If multiple expressions match a FlowFile's
+  * attributes, that FlowFile will be cloned and routed to each corresponding
+  * relationship. If none of the supplied expressions matches for a given
+  * FlowFile, that FlowFile will be routed to the 'unmatched' relationship.
+  * </p>
+  *
+  * @author unattributed
+  */
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"attributes", "routing", "Attribute Expression Language", "regexp", "regex", "Regular Expression", "Expression Language"})
+ @CapabilityDescription("Routes FlowFiles based on their Attributes using the Attribute Expression Language")
+ public class RouteOnAttribute extends AbstractProcessor {
+ 
+     public static final String ROUTE_ATTRIBUTE_KEY = "RouteOnAttribute.Route";
+ 
+     // keep the word 'match' instead of 'matched' to maintain backward compatibility (there was a typo originally)
+     private static final String routeAllMatchValue = "Route to 'match' if all match";
+     private static final String routeAnyMatches = "Route to 'match' if any matches";
+     private static final String routePropertyNameValue = "Route to Property name";
+ 
+     public static final AllowableValue ROUTE_PROPERTY_NAME = new AllowableValue(
+             routePropertyNameValue,
+             "Route to Property name",
+             "A copy of the FlowFile will be routed to each relationship whose corresponding expression evaluates to 'true'"
+     );
+     public static final AllowableValue ROUTE_ALL_MATCH = new AllowableValue(
+             routeAllMatchValue,
+             "Route to 'matched' if all match",
+             "Requires that all user-defined expressions evaluate to 'true' for the FlowFile to be considered a match"
+     );
+     public static final AllowableValue ROUTE_ANY_MATCHES = new AllowableValue(
+             routeAnyMatches, // keep the word 'match' instead of 'matched' to maintain backward compatibility (there was a typo originally)
+             "Route to 'matched' if any matches",
+             "Requires that at least one user-defined expression evaluate to 'true' for hte FlowFile to be considered a match"
+     );
+ 
+     public static final PropertyDescriptor ROUTE_STRATEGY = new PropertyDescriptor.Builder()
+             .name("Routing Strategy")
+             .description("Specifies how to determine which relationship to use when evaluating the Expression Language")
+             .required(true)
+             .allowableValues(ROUTE_PROPERTY_NAME, ROUTE_ALL_MATCH, ROUTE_ANY_MATCHES)
+             .defaultValue(ROUTE_PROPERTY_NAME.getValue())
+             .build();
+ 
+     public static final Relationship REL_NO_MATCH = new Relationship.Builder()
+             .name("unmatched")
+             .description("FlowFiles that do not match any user-define expression will be routed here")
+             .build();
+     public static final Relationship REL_MATCH = new Relationship.Builder()
+             .name("matched")
+             .description("FlowFiles will be routed to 'match' if one or all Expressions match, depending on the configuration of the Routing Strategy property")
+             .build();
+ 
+     private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
+     private List<PropertyDescriptor> properties;
+     private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue();
+     private volatile Set<String> dynamicPropertyNames = new HashSet<>();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> set = new HashSet<>();
+         set.add(REL_NO_MATCH);
+         relationships = new AtomicReference<>(set);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(ROUTE_STRATEGY);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships.get();
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+         return new PropertyDescriptor.Builder()
+                 .required(false)
+                 .name(propertyDescriptorName)
+                 .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.BOOLEAN, false))
+                 .dynamic(true)
+                 .expressionLanguageSupported(true)
+                 .build();
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+         if (descriptor.equals(ROUTE_STRATEGY)) {
+             configuredRouteStrategy = newValue;
+         } else {
+             final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames);
+             if (newValue == null) {
+                 newDynamicPropertyNames.remove(descriptor.getName());
+             } else if (oldValue == null) {    // new property
+                 newDynamicPropertyNames.add(descriptor.getName());
+             }
+ 
+             this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
+         }
+ 
+         // formulate the new set of Relationships
+         final Set<String> allDynamicProps = this.dynamicPropertyNames;
+         final Set<Relationship> newRelationships = new HashSet<>();
+         final String routeStrategy = configuredRouteStrategy;
+         if (ROUTE_PROPERTY_NAME.equals(routeStrategy)) {
+             for (final String propName : allDynamicProps) {
+                 newRelationships.add(new Relationship.Builder().name(propName).build());
+             }
+         } else {
+             newRelationships.add(REL_MATCH);
+         }
+ 
+         newRelationships.add(REL_NO_MATCH);
+         this.relationships.set(newRelationships);
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
+         for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+             if (!descriptor.isDynamic()) {
+                 continue;
+             }
+ 
+             propertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
+         }
+ 
+         final Set<Relationship> matchingRelationships = new HashSet<>();
+         for (final Map.Entry<Relationship, PropertyValue> entry : propertyMap.entrySet()) {
+             final PropertyValue value = entry.getValue();
+ 
+             final boolean matches = value.evaluateAttributeExpressions(flowFile).asBoolean();
+             if (matches) {
+                 matchingRelationships.add(entry.getKey());
+             }
+         }
+ 
+         final Set<Relationship> destinationRelationships = new HashSet<>();
+         switch (context.getProperty(ROUTE_STRATEGY).getValue()) {
+             case routeAllMatchValue:
+                 if (matchingRelationships.size() == propertyMap.size()) {
+                     destinationRelationships.add(REL_MATCH);
+                 } else {
+                     destinationRelationships.add(REL_NO_MATCH);
+                 }
+                 break;
+             case routeAnyMatches:
+                 if (matchingRelationships.isEmpty()) {
+                     destinationRelationships.add(REL_NO_MATCH);
+                 } else {
+                     destinationRelationships.add(REL_MATCH);
+                 }
+                 break;
+             case routePropertyNameValue:
+             default:
+                 destinationRelationships.addAll(matchingRelationships);
+                 break;
+         }
+ 
+         if (destinationRelationships.isEmpty()) {
+             logger.info(this + " routing " + flowFile + " to unmatched");
+             flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName());
+             session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
+             session.transfer(flowFile, REL_NO_MATCH);
+         } else {
+             final Iterator<Relationship> relationshipNameIterator = destinationRelationships.iterator();
+             final Relationship firstRelationship = relationshipNameIterator.next();
+             final Map<Relationship, FlowFile> transferMap = new HashMap<>();
+             final Set<FlowFile> clones = new HashSet<>();
+ 
+             // make all the clones for any remaining relationships
+             while (relationshipNameIterator.hasNext()) {
+                 final Relationship relationship = relationshipNameIterator.next();
+                 final FlowFile cloneFlowFile = session.clone(flowFile);
+                 clones.add(cloneFlowFile);
+                 transferMap.put(relationship, cloneFlowFile);
+             }
+ 
+             // now transfer any clones generated
+             for (final Map.Entry<Relationship, FlowFile> entry : transferMap.entrySet()) {
+                 logger.info(this + " cloned " + flowFile + " into " + entry.getValue() + " and routing clone to relationship " + entry.getKey());
+                 FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.getKey().getName());
+                 session.getProvenanceReporter().route(updatedFlowFile, entry.getKey());
+                 session.transfer(updatedFlowFile, entry.getKey());
+             }
+ 
+             //now transfer the original flow file
+             logger.info("Routing {} to {}", new Object[]{flowFile, firstRelationship});
+             session.getProvenanceReporter().route(flowFile, firstRelationship);
+             flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, firstRelationship.getName());
+             session.transfer(flowFile, firstRelationship);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java
index 0000000,cb3cff2..3e581d2
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnContent.java
@@@ -1,0 -1,232 +1,232 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.nio.charset.Charset;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.expression.AttributeValueDecorator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.IntegerHolder;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"route", "content", "regex", "regular expression", "regexp"})
+ @CapabilityDescription("Applies Regular Expressions to the content of a FlowFile and routes a copy of the FlowFile to each "
+         + "destination whose Regular Expression matches. Regular Expressions are added as User-Defined Properties where the name "
+         + "of the property is the name of the relationship and the value is a Regular Expression to match against the FlowFile "
+         + "content. User-Defined properties do support the Attribute Expression Language, but the results are interpreted as "
+         + "literal values, not Regular Expressions")
+ public class RouteOnContent extends AbstractProcessor {
+ 
+     public static final String ROUTE_ATTRIBUTE_KEY = "RouteOnContent.Route";
+ 
+     public static final String MATCH_ALL = "content must match exactly";
+     public static final String MATCH_SUBSEQUENCE = "content must contain match";
+ 
+     public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder()
+             .name("Content Buffer Size")
+             .description("Specifies the maximum amount of data to buffer in order to apply the regular expressions. If the size of the FlowFile exceeds this value, any amount of this value will be ignored")
+             .required(true)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .build();
+     public static final PropertyDescriptor MATCH_REQUIREMENT = new PropertyDescriptor.Builder()
+             .name("Match Requirement")
+             .description("Specifies whether the entire content of the file must match the regular expression exactly, or if any part of the file (up to Content Buffer Size) can contain the regular expression in order to be considered a match")
+             .required(true)
+             .allowableValues(MATCH_ALL, MATCH_SUBSEQUENCE)
+             .defaultValue(MATCH_ALL)
+             .build();
+     public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+             .name("Character Set")
+             .description("The Character Set in which the file is encoded")
+             .required(true)
+             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+             .defaultValue("UTF-8")
+             .build();
+ 
+     public static final Relationship REL_NO_MATCH = new Relationship.Builder().name("unmatched")
+             .description("FlowFiles that do not match any of the user-supplied regular expressions will be routed to this relationship").build();
+ 
+     private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
+     private List<PropertyDescriptor> properties;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_NO_MATCH);
+         this.relationships.set(Collections.unmodifiableSet(relationships));
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(MATCH_REQUIREMENT);
+         properties.add(CHARACTER_SET);
+         properties.add(BUFFER_SIZE);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships.get();
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+         if (propertyDescriptorName.equals(REL_NO_MATCH.getName())) {
+             return null;
+         }
+ 
+         return new PropertyDescriptor.Builder()
+                 .required(false)
+                 .name(propertyDescriptorName)
+                 .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
+                 .dynamic(true)
+                 .expressionLanguageSupported(true)
+                 .build();
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+         if (descriptor.isDynamic()) {
+             final Set<Relationship> relationships = new HashSet<>(this.relationships.get());
+             final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build();
+ 
+             if (newValue == null) {
+                 relationships.remove(relationship);
+             } else {
+                 relationships.add(relationship);
+             }
+ 
+             this.relationships.set(relationships);
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         final List<FlowFile> flowFiles = session.get(1);
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         final AttributeValueDecorator quoteDecorator = new AttributeValueDecorator() {
+             @Override
+             public String decorate(final String attributeValue) {
+                 return (attributeValue == null) ? null : Pattern.quote(attributeValue);
+             }
+         };
+ 
+         final Map<FlowFile, Set<Relationship>> flowFileDestinationMap = new HashMap<>();
+         final ProcessorLog logger = getLogger();
+ 
+         final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+         final byte[] buffer = new byte[context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue()];
+         for (final FlowFile flowFile : flowFiles) {
+             final Set<Relationship> destinations = new HashSet<>();
+             flowFileDestinationMap.put(flowFile, destinations);
+ 
+             final IntegerHolder bufferedByteCount = new IntegerHolder(0);
+             session.read(flowFile, new InputStreamCallback() {
+                 @Override
+                 public void process(final InputStream in) throws IOException {
+                     bufferedByteCount.set(StreamUtils.fillBuffer(in, buffer, false));
+                 }
+             });
+ 
+             final String contentString = new String(buffer, 0, bufferedByteCount.get(), charset);
+ 
+             for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+                 if (!descriptor.isDynamic()) {
+                     continue;
+                 }
+ 
+                 final String regex = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile, quoteDecorator).getValue();
+                 final Pattern pattern = Pattern.compile(regex);
+                 final boolean matches;
+                 if (context.getProperty(MATCH_REQUIREMENT).getValue().equalsIgnoreCase(MATCH_ALL)) {
+                     matches = pattern.matcher(contentString).matches();
+                 } else {
+                     matches = pattern.matcher(contentString).find();
+                 }
+ 
+                 if (matches) {
+                     final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build();
+                     destinations.add(relationship);
+                 }
+             }
+         }
+ 
+         for (final Map.Entry<FlowFile, Set<Relationship>> entry : flowFileDestinationMap.entrySet()) {
+             FlowFile flowFile = entry.getKey();
+             final Set<Relationship> destinations = entry.getValue();
+ 
+             if (destinations.isEmpty()) {
+                 flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName());
+                 session.transfer(flowFile, REL_NO_MATCH);
+                 session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
+                 logger.info("Routing {} to 'unmatched'", new Object[]{flowFile});
+             } else {
+                 final Relationship firstRelationship = destinations.iterator().next();
+                 destinations.remove(firstRelationship);
+ 
+                 for (final Relationship relationship : destinations) {
+                     FlowFile clone = session.clone(flowFile);
+                     clone = session.putAttribute(clone, ROUTE_ATTRIBUTE_KEY, relationship.getName());
+                     session.getProvenanceReporter().route(clone, relationship);
+                     session.transfer(clone, relationship);
+                     logger.info("Cloning {} to {} and routing clone to {}", new Object[]{flowFile, clone, relationship});
+                 }
+ 
+                 flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, firstRelationship.getName());
+                 session.getProvenanceReporter().route(flowFile, firstRelationship);
+                 session.transfer(flowFile, firstRelationship);
+                 logger.info("Routing {} to {}", new Object[]{flowFile, firstRelationship});
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java
index 0000000,df13c66..6d48d02
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java
@@@ -1,0 -1,229 +1,229 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.BufferedReader;
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.InputStreamReader;
+ import java.nio.file.Paths;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+ import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"scan", "attributes", "search", "lookup"})
+ @CapabilityDescription("Scans the specified attributes of FlowFiles, checking to see if any of their values are "
+         + "present within the specified dictionary of terms")
+ public class ScanAttribute extends AbstractProcessor {
+ 
+     public static final String MATCH_CRITERIA_ALL = "All Must Match";
+     public static final String MATCH_CRITERIA_ANY = "At Least 1 Must Match";
+ 
+     public static final PropertyDescriptor MATCHING_CRITERIA = new PropertyDescriptor.Builder()
+             .name("Match Criteria")
+             .description("If set to All Must Match, then FlowFiles will be routed to 'matched' only if all specified "
+                     + "attributes' values are found in the dictionary. If set to At Least 1 Must Match, FlowFiles will "
+                     + "be routed to 'matched' if any attribute specified is found in the dictionary")
+             .required(true)
+             .allowableValues(MATCH_CRITERIA_ANY, MATCH_CRITERIA_ALL)
+             .defaultValue(MATCH_CRITERIA_ANY)
+             .build();
+     public static final PropertyDescriptor ATTRIBUTE_PATTERN = new PropertyDescriptor.Builder()
+             .name("Attribute Pattern")
+             .description("Regular Expression that specifies the names of attributes whose values will be matched against the terms in the dictionary")
+             .required(true)
+             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+             .defaultValue(".*")
+             .build();
+     public static final PropertyDescriptor DICTIONARY_FILE = new PropertyDescriptor.Builder()
+             .name("Dictionary File")
+             .description("A new-line-delimited text file that includes the terms that should trigger a match. Empty lines are ignored.")
+             .required(true)
+             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor DICTIONARY_FILTER = new PropertyDescriptor.Builder()
+             .name("Dictionary Filter Pattern")
+             .description("A Regular Expression that will be applied to each line in the dictionary file. If the regular expression does not match the line, the line will not be included in the list of terms to search for. If a Matching Group is specified, only the portion of the term that matches that Matching Group will be used instead of the entire term. If not specified, all terms in the dictionary will be used and each term will consist of the text of the entire line in the file")
+             .required(false)
+             .addValidator(StandardValidators.createRegexValidator(0, 1, false))
+             .defaultValue(null)
+             .build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     private volatile Pattern dictionaryFilterPattern = null;
+     private volatile Pattern attributePattern = null;
+     private volatile Set<String> dictionaryTerms = null;
+     private volatile SynchronousFileWatcher fileWatcher = null;
+ 
+     public static final Relationship REL_MATCHED = new Relationship.Builder().name("matched").description("FlowFiles whose attributes are found in the dictionary will be routed to this relationship").build();
+     public static final Relationship REL_UNMATCHED = new Relationship.Builder().name("unmatched").description("FlowFiles whose attributes are not found in the dictionary will be routed to this relationship").build();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(DICTIONARY_FILE);
+         properties.add(ATTRIBUTE_PATTERN);
+         properties.add(MATCHING_CRITERIA);
+         properties.add(DICTIONARY_FILTER);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_MATCHED);
+         relationships.add(REL_UNMATCHED);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @OnScheduled
+     public void onScheduled(final ProcessContext context) throws IOException {
+         final String filterRegex = context.getProperty(DICTIONARY_FILTER).getValue();
+         this.dictionaryFilterPattern = (filterRegex == null) ? null : Pattern.compile(filterRegex);
+ 
+         final String attributeRegex = context.getProperty(ATTRIBUTE_PATTERN).getValue();
+         this.attributePattern = (attributeRegex.equals(".*")) ? null : Pattern.compile(attributeRegex);
+ 
+         this.dictionaryTerms = createDictionary(context);
+         this.fileWatcher = new SynchronousFileWatcher(Paths.get(context.getProperty(DICTIONARY_FILE).getValue()), new LastModifiedMonitor(), 1000L);
+     }
+ 
+     private Set<String> createDictionary(final ProcessContext context) throws IOException {
+         final Set<String> terms = new HashSet<>();
+ 
+         final File file = new File(context.getProperty(DICTIONARY_FILE).getValue());
+         try (final InputStream fis = new FileInputStream(file);
+                 final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
+ 
+             String line;
+             while ((line = reader.readLine()) != null) {
+                 if (line.trim().isEmpty()) {
+                     continue;
+                 }
+ 
+                 String matchingTerm = line;
+                 if (dictionaryFilterPattern != null) {
+                     final Matcher matcher = dictionaryFilterPattern.matcher(line);
+                     if (!matcher.matches()) {
+                         continue;
+                     }
+ 
+                     // Determine if we should use the entire line or only a part, depending on whether or not
+                     // a Matching Group was specified in the regex.
+                     if (matcher.groupCount() == 1) {
+                         matchingTerm = matcher.group(1);
+                     } else {
+                         matchingTerm = line;
+                     }
+                 }
+ 
+                 terms.add(matchingTerm);
+             }
+         }
+ 
+         return Collections.unmodifiableSet(terms);
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+         final List<FlowFile> flowFiles = session.get(50);
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         try {
+             if (fileWatcher.checkAndReset()) {
+                 this.dictionaryTerms = createDictionary(context);
+             }
+         } catch (final IOException e) {
+             logger.error("Unable to reload dictionary due to {}", e);
+         }
+ 
+         final boolean matchAll = context.getProperty(MATCHING_CRITERIA).getValue().equals(MATCH_CRITERIA_ALL);
+ 
+         for (final FlowFile flowFile : flowFiles) {
+             final boolean matched = matchAll ? allMatch(flowFile, attributePattern, dictionaryTerms) : anyMatch(flowFile, attributePattern, dictionaryTerms);
+             final Relationship relationship = matched ? REL_MATCHED : REL_UNMATCHED;
+             session.getProvenanceReporter().route(flowFile, relationship);
+             session.transfer(flowFile, relationship);
+             logger.info("Transferred {} to {}", new Object[]{flowFile, relationship});
+         }
+     }
+ 
+     private boolean allMatch(final FlowFile flowFile, final Pattern attributePattern, final Set<String> dictionary) {
+         for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
+             if (attributePattern == null || attributePattern.matcher(entry.getKey()).matches()) {
+                 if (!dictionary.contains(entry.getValue())) {
+                     return false;
+                 }
+             }
+         }
+ 
+         return true;
+     }
+ 
+     private boolean anyMatch(final FlowFile flowFile, final Pattern attributePattern, final Set<String> dictionary) {
+         for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
+             if (attributePattern == null || attributePattern.matcher(entry.getKey()).matches()) {
+                 if (dictionary.contains(entry.getValue())) {
+                     return true;
+                 }
+             }
+         }
+ 
+         return false;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java
index 0000000,9f53469..d9f2034
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanContent.java
@@@ -1,0 -1,292 +1,292 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.BufferedReader;
+ import java.io.Closeable;
+ import java.io.DataInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.InputStreamReader;
+ import java.nio.charset.Charset;
+ import java.nio.file.Files;
+ import java.nio.file.Paths;
+ import java.nio.file.StandardOpenOption;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+ import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.ObjectHolder;
+ import org.apache.nifi.util.search.Search;
+ import org.apache.nifi.util.search.SearchTerm;
+ import org.apache.nifi.util.search.ahocorasick.AhoCorasick;
+ import org.apache.nifi.util.search.ahocorasick.SearchState;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"aho-corasick", "scan", "content", "byte sequence", "search", "find", "dictionary"})
+ @CapabilityDescription("Scans the content of FlowFiles for terms that are found in a user-supplied dictionary. If a term is matched, the UTF-8 encoded version of the term will be added to the FlowFile using the 'matching.term' attribute")
+ public class ScanContent extends AbstractProcessor {
+ 
+     public static final String TEXT_ENCODING = "text";
+     public static final String BINARY_ENCODING = "binary";
+     public static final String MATCH_ATTRIBUTE_KEY = "matching.term";
+ 
+     public static final PropertyDescriptor DICTIONARY = new PropertyDescriptor.Builder()
+             .name("Dictionary File")
+             .description("The filename of the terms dictionary")
+             .required(true)
+             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor DICTIONARY_ENCODING = new PropertyDescriptor.Builder()
+             .name("Dictionary Encoding")
+             .description("Indicates how the dictionary is encoded. If 'text', dictionary terms are new-line delimited and UTF-8 encoded; if 'binary', dictionary terms are denoted by a 4-byte integer indicating the term length followed by the term itself")
+             .required(true)
+             .allowableValues(TEXT_ENCODING, BINARY_ENCODING)
+             .defaultValue(TEXT_ENCODING)
+             .build();
+ 
+     public static final Relationship REL_MATCH = new Relationship.Builder().name("matched").description("FlowFiles that match at least one term in the dictionary are routed to this relationship").build();
+     public static final Relationship REL_NO_MATCH = new Relationship.Builder().name("unmatched").description("FlowFiles that do not match any term in the dictionary are routed to this relationship").build();
+ 
+     public static final Charset UTF8 = Charset.forName("UTF-8");
+ 
+     private final AtomicReference<SynchronousFileWatcher> fileWatcherRef = new AtomicReference<>();
+     private final AtomicReference<Search<byte[]>> searchRef = new AtomicReference<>();
+     private final ReentrantLock dictionaryUpdateLock = new ReentrantLock();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(DICTIONARY);
+         properties.add(DICTIONARY_ENCODING);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_MATCH);
+         relationships.add(REL_NO_MATCH);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+         if (descriptor.equals(DICTIONARY)) {
+             fileWatcherRef.set(new SynchronousFileWatcher(Paths.get(newValue), new LastModifiedMonitor(), 60000L));
+         }
+     }
+ 
+     private boolean reloadDictionary(final ProcessContext context, final boolean force, final ProcessorLog logger) throws IOException {
+         boolean obtainedLock;
+         if (force) {
+             dictionaryUpdateLock.lock();
+             obtainedLock = true;
+         } else {
+             obtainedLock = dictionaryUpdateLock.tryLock();
+         }
+ 
+         if (obtainedLock) {
+             try {
+                 final Search<byte[]> search = new AhoCorasick<>();
+                 final Set<SearchTerm<byte[]>> terms = new HashSet<>();
+ 
+                 final InputStream inStream = Files.newInputStream(Paths.get(context.getProperty(DICTIONARY).getValue()), StandardOpenOption.READ);
+ 
+                 final TermLoader termLoader;
+                 if (context.getProperty(DICTIONARY_ENCODING).getValue().equalsIgnoreCase(TEXT_ENCODING)) {
+                     termLoader = new TextualTermLoader(inStream);
+                 } else {
+                     termLoader = new BinaryTermLoader(inStream);
+                 }
+ 
+                 try {
+                     SearchTerm<byte[]> term;
+                     while ((term = termLoader.nextTerm()) != null) {
+                         terms.add(term);
+                     }
+ 
+                     search.initializeDictionary(terms);
+                     searchRef.set(search);
+                     logger.info("Loaded search dictionary from {}", new Object[]{context.getProperty(DICTIONARY).getValue()});
+                     return true;
+                 } finally {
+                     termLoader.close();
+                 }
+             } finally {
+                 dictionaryUpdateLock.unlock();
+             }
+         } else {
+             return false;
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+         final ProcessorLog logger = getLogger();
+         final SynchronousFileWatcher fileWatcher = fileWatcherRef.get();
+         try {
+             if (fileWatcher.checkAndReset()) {
+                 reloadDictionary(context, true, logger);
+             }
+         } catch (final IOException e) {
+             throw new ProcessException(e);
+         }
+ 
+         Search<byte[]> search = searchRef.get();
+         try {
+             if (search == null) {
+                 if (reloadDictionary(context, false, logger)) {
+                     search = searchRef.get();
+                 }
+             }
+         } catch (final IOException e) {
+             throw new ProcessException(e);
+         }
+ 
+         if (search == null) {
+             return;
+         }
+ 
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final Search<byte[]> finalSearch = search;
+         final ObjectHolder<SearchTerm<byte[]>> termRef = new ObjectHolder<>(null);
+         termRef.set(null);
+ 
+         session.read(flowFile, new InputStreamCallback() {
+             @Override
+             public void process(final InputStream rawIn) throws IOException {
+                 try (final InputStream in = new BufferedInputStream(rawIn)) {
+                     final SearchState<byte[]> searchResult = finalSearch.search(in, false);
+                     if (searchResult.foundMatch()) {
+                         termRef.set(searchResult.getResults().keySet().iterator().next());
+                     }
+                 }
+             }
+         });
+ 
+         final SearchTerm<byte[]> matchingTerm = termRef.get();
+         if (matchingTerm == null) {
+             logger.info("Routing {} to 'unmatched'", new Object[]{flowFile});
+             session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
+             session.transfer(flowFile, REL_NO_MATCH);
+         } else {
+             final String matchingTermString = matchingTerm.toString(UTF8);
+             logger.info("Routing {} to 'matched' because it matched term {}", new Object[]{flowFile, matchingTermString});
+             flowFile = session.putAttribute(flowFile, MATCH_ATTRIBUTE_KEY, matchingTermString);
+             session.getProvenanceReporter().route(flowFile, REL_MATCH);
+             session.transfer(flowFile, REL_MATCH);
+         }
+     }
+ 
+     private static interface TermLoader extends Closeable {
+ 
+         SearchTerm<byte[]> nextTerm() throws IOException;
+     }
+ 
+     private static class TextualTermLoader implements TermLoader {
+ 
+         private final BufferedReader reader;
+ 
+         public TextualTermLoader(final InputStream inStream) {
+             this.reader = new BufferedReader(new InputStreamReader(inStream));
+         }
+ 
+         @Override
+         public SearchTerm<byte[]> nextTerm() throws IOException {
+             final String nextLine = reader.readLine();
+             if (nextLine == null) {
+                 return null;
+             }
+             return new SearchTerm<>(nextLine.getBytes("UTF-8"));
+         }
+ 
+         @Override
+         public void close() throws IOException {
+             this.reader.close();
+         }
+     }
+ 
+     private static class BinaryTermLoader implements TermLoader {
+ 
+         private final DataInputStream inStream;
+ 
+         public BinaryTermLoader(final InputStream inStream) {
+             this.inStream = new DataInputStream(new BufferedInputStream(inStream));
+         }
+ 
+         @Override
+         public SearchTerm<byte[]> nextTerm() throws IOException {
+             inStream.mark(1);
+             final int nextByte = inStream.read();
+             if (nextByte == -1) {
+                 return null;
+             }
+ 
+             inStream.reset();
+             final int termLength = inStream.readInt();
+             final byte[] term = new byte[termLength];
+             inStream.readFully(term);
+ 
+             return new SearchTerm<>(term);
+         }
+ 
+         @Override
+         public void close() throws IOException {
+             this.inStream.close();
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
index 0000000,1df4de6..dfdd401
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
@@@ -1,0 -1,163 +1,163 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"segment", "split"})
+ @CapabilityDescription("Segments a FlowFile into multiple smaller segments on byte boundaries. Each segment is given the following attributes: "
+         + "fragment.identifier, fragment.index, fragment.count, segment.original.filename; these attributes can then be used by the "
+         + "MergeContent processor in order to reconstitute the original FlowFile")
+ public class SegmentContent extends AbstractProcessor {
+ 
+     public static final String SEGMENT_ID = "segment.identifier";
+     public static final String SEGMENT_INDEX = "segment.index";
+     public static final String SEGMENT_COUNT = "segment.count";
+     public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ 
+     public static final String FRAGMENT_ID = "fragment.identifier";
+     public static final String FRAGMENT_INDEX = "fragment.index";
+     public static final String FRAGMENT_COUNT = "fragment.count";
+ 
+     public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder()
+             .name("Segment Size")
+             .description("The maximum data size for each segment")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .required(true)
+             .build();
+ 
+     public static final Relationship REL_SEGMENTS = new Relationship.Builder().name("segments").description("All segments will be sent to this relationship. If the file was small enough that it was not segmented, a copy of the original is sent to this relationship as well as original").build();
+     public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile will be sent to this relationship").build();
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> propertyDescriptors;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SEGMENTS);
+         relationships.add(REL_ORIGINAL);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+         descriptors.add(SIZE);
+         this.propertyDescriptors = Collections.unmodifiableList(descriptors);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return propertyDescriptors;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         try {
+             final String segmentId = UUID.randomUUID().toString();
+             final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
+ 
+             final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ 
+             if (flowFile.getSize() <= segmentSize) {
+                 flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
+                 flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
+                 flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
+                 flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName);
+ 
+                 flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
+                 flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1");
+                 flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1");
+ 
+                 FlowFile clone = session.clone(flowFile);
+                 session.transfer(flowFile, REL_ORIGINAL);
+                 session.transfer(clone, REL_SEGMENTS);
+                 return;
+             }
+ 
+             int totalSegments = (int) (flowFile.getSize() / segmentSize);
+             if (totalSegments * segmentSize < flowFile.getSize()) {
+                 totalSegments++;
+             }
+ 
+             final Map<String, String> segmentAttributes = new HashMap<>();
+             segmentAttributes.put(SEGMENT_ID, segmentId);
+             segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments));
+             segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
+ 
+             segmentAttributes.put(FRAGMENT_ID, segmentId);
+             segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments));
+ 
+             final Set<FlowFile> segmentSet = new HashSet<>();
+             for (int i = 1; i <= totalSegments; i++) {
+                 final long segmentOffset = segmentSize * (i - 1);
+                 FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset));
+                 segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i));
+                 segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i));
+                 segment = session.putAllAttributes(segment, segmentAttributes);
+                 segmentSet.add(segment);
+             }
+ 
+             session.transfer(segmentSet, REL_SEGMENTS);
+             session.transfer(flowFile, REL_ORIGINAL);
+ 
+             if (totalSegments <= 10) {
+                 getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet});
+             } else {
+                 getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments});
+             }
+         } catch (final Exception e) {
+             throw new ProcessException(e);
+         }
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
index 0000000,7e67c01..8c00a7e
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
@@@ -1,0 -1,260 +1,260 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.util.NaiveSearchRingBuffer;
+ import org.apache.nifi.util.Tuple;
+ 
+ import org.apache.commons.codec.binary.Hex;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"content", "split", "binary"})
+ @CapabilityDescription("Splits incoming FlowFiles by a specified byte sequence")
+ public class SplitContent extends AbstractProcessor {
+ 
+     // attribute keys
+     public static final String FRAGMENT_ID = "fragment.identifier";
+     public static final String FRAGMENT_INDEX = "fragment.index";
+     public static final String FRAGMENT_COUNT = "fragment.count";
+     public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ 
+     public static final PropertyDescriptor BYTE_SEQUENCE = new PropertyDescriptor.Builder()
+             .name("Byte Sequence")
+             .description("A hex representation of bytes to look for and upon which to split the source file into separate files")
+             .addValidator(new HexStringPropertyValidator())
+             .required(true)
+             .build();
+     public static final PropertyDescriptor KEEP_SEQUENCE = new PropertyDescriptor.Builder()
+             .name("Keep Byte Sequence")
+             .description("Determines whether or not the Byte Sequence should be included at the end of each Split")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final Relationship REL_SPLITS = new Relationship.Builder()
+             .name("splits")
+             .description("All Splits will be routed to the splits relationship")
+             .build();
+     public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+             .name("original")
+             .description("The original file")
+             .build();
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+ 
+     private final AtomicReference<byte[]> byteSequence = new AtomicReference<>();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SPLITS);
+         relationships.add(REL_ORIGINAL);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(BYTE_SEQUENCE);
+         properties.add(KEEP_SEQUENCE);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+         if (descriptor.equals(BYTE_SEQUENCE)) {
+             try {
+                 this.byteSequence.set(Hex.decodeHex(newValue.toCharArray()));
+             } catch (final Exception e) {
+                 this.byteSequence.set(null);
+             }
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         final FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final boolean keepSequence = context.getProperty(KEEP_SEQUENCE).asBoolean();
+         final byte[] byteSequence = this.byteSequence.get();
+         if (byteSequence == null) {   // should never happen. But just in case...
+             logger.error("{} Unable to obtain Byte Sequence", new Object[]{this});
+             session.rollback();
+             return;
+         }
+ 
+         final List<Tuple<Long, Long>> splits = new ArrayList<>();
+ 
+         final NaiveSearchRingBuffer buffer = new NaiveSearchRingBuffer(byteSequence);
+         session.read(flowFile, new InputStreamCallback() {
+             @Override
+             public void process(final InputStream rawIn) throws IOException {
+                 long bytesRead = 0L;
+                 long startOffset = 0L;
+ 
+                 try (final InputStream in = new BufferedInputStream(rawIn)) {
+                     while (true) {
+                         final int nextByte = in.read();
+                         if (nextByte == -1) {
+                             return;
+                         }
+ 
+                         bytesRead++;
+                         boolean matched = buffer.addAndCompare((byte) (nextByte & 0xFF));
+                         if (matched) {
+                             final long splitLength;
+ 
+                             if (keepSequence) {
+                                 splitLength = bytesRead - startOffset;
+                             } else {
+                                 splitLength = bytesRead - startOffset - byteSequence.length;
+                             }
+ 
+                             splits.add(new Tuple<>(startOffset, splitLength));
+                             startOffset = bytesRead;
+                             buffer.clear();
+                         }
+                     }
+                 }
+             }
+         });
+ 
+         long lastOffsetPlusSize = -1L;
+         if (splits.isEmpty()) {
+             FlowFile clone = session.clone(flowFile);
+             session.transfer(flowFile, REL_ORIGINAL);
+             session.transfer(clone, REL_SPLITS);
+             logger.info("Found no match for {}; transferring original 'original' and transferring clone {} to 'splits'", new Object[]{flowFile, clone});
+             return;
+         }
+ 
+         final ArrayList<FlowFile> splitList = new ArrayList<>();
+         for (final Tuple<Long, Long> tuple : splits) {
+             long offset = tuple.getKey();
+             long size = tuple.getValue();
+             if (size > 0) {
+                 FlowFile split = session.clone(flowFile, offset, size);
+                 splitList.add(split);
+             }
+ 
+             lastOffsetPlusSize = offset + size;
+         }
+ 
+         long finalSplitOffset = lastOffsetPlusSize;
+         if (!keepSequence) {
+             finalSplitOffset += byteSequence.length;
+         }
+         if (finalSplitOffset > -1L && finalSplitOffset < flowFile.getSize()) {
+             FlowFile finalSplit = session.clone(flowFile, finalSplitOffset, flowFile.getSize() - finalSplitOffset);
+             splitList.add(finalSplit);
+         }
+ 
+         finishFragmentAttributes(session, flowFile, splitList);
+         session.transfer(splitList, REL_SPLITS);
+         session.transfer(flowFile, REL_ORIGINAL);
+ 
+         if (splitList.size() > 10) {
+             logger.info("Split {} into {} files", new Object[]{flowFile, splitList.size()});
+         } else {
+             logger.info("Split {} into {} files: {}", new Object[]{flowFile, splitList.size(), splitList});
+         }
+     }
+ 
+     /**
+      * Apply split index, count and other attributes.
+      *
+      * @param session
+      * @param source
+      * @param unpacked
+      */
+     private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
+         final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
+ 
+         final String fragmentId = UUID.randomUUID().toString();
+         final ArrayList<FlowFile> newList = new ArrayList<>(splits);
+         splits.clear();
+         for (int i = 1; i <= newList.size(); i++) {
+             FlowFile ff = newList.get(i - 1);
+             final Map<String, String> attributes = new HashMap<>();
+             attributes.put(FRAGMENT_ID, fragmentId);
+             attributes.put(FRAGMENT_INDEX, String.valueOf(i));
+             attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
+             attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
+             FlowFile newFF = session.putAllAttributes(ff, attributes);
+             splits.add(newFF);
+         }
+     }
+ 
+     static class HexStringPropertyValidator implements Validator {
+ 
+         @Override
+         public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) {
+             try {
+                 Hex.decodeHex(input.toCharArray());
+                 return new ValidationResult.Builder().valid(true).input(input).subject(subject).build();
+             } catch (final Exception e) {
+                 return new ValidationResult.Builder().valid(false).explanation("Not a valid Hex String").input(input).subject(subject).build();
+             }
+         }
+     }
+ }