You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/26 15:19:46 UTC

[22/48] incubator-nifi git commit: NIFI-6: Rebase from develop to include renaming of directory structure

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
index 0000000,6db2757..70ac5ac
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
@@@ -1,0 -1,367 +1,367 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.Paths;
+ import java.nio.file.attribute.PosixFileAttributeView;
+ import java.nio.file.attribute.PosixFilePermissions;
+ import java.nio.file.attribute.UserPrincipalLookupService;
+ import java.text.DateFormat;
+ import java.text.SimpleDateFormat;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Locale;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.StopWatch;
+ 
+ @SupportsBatching
+ @Tags({"put", "local", "copy", "archive", "files", "filesystem"})
+ @CapabilityDescription("Writes the contents of a FlowFile to the local file system")
+ public class PutFile extends AbstractProcessor {
+ 
+     public static final String REPLACE_RESOLUTION = "replace";
+     public static final String IGNORE_RESOLUTION = "ignore";
+     public static final String FAIL_RESOLUTION = "fail";
+ 
+     public static final String FILE_MODIFY_DATE_ATTRIBUTE = "file.lastModifiedTime";
+     public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
+ 
+     public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
+             .name("Directory")
+             .description("The directory to which files should be written. You may use expression language such as /aa/bb/${path}")
+             .required(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor MAX_DESTINATION_FILES = new PropertyDescriptor.Builder()
+             .name("Maximum File Count")
+             .description("Specifies the maximum number of files that can exist in the output directory")
+             .required(false)
+             .addValidator(StandardValidators.INTEGER_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+             .name("Conflict Resolution Strategy")
+             .description("Indicates what should happen when a file with the same name already exists in the output directory")
+             .required(true)
+             .defaultValue(FAIL_RESOLUTION)
+             .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION)
+             .build();
+     public static final PropertyDescriptor CHANGE_LAST_MODIFIED_TIME = new PropertyDescriptor.Builder()
+             .name("Last Modified Time")
+             .description("Sets the lastModifiedTime on the output file to the value of this attribute.  Format must be yyyy-MM-dd'T'HH:mm:ssZ.  You may also use expression language such as ${file.lastModifiedTime}.")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor CHANGE_PERMISSIONS = new PropertyDescriptor.Builder()
+             .name("Permissions")
+             .description("Sets the permissions on the output file to the value of this attribute.  Format must be either UNIX rwxrwxrwx with a - in place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644).  You may also use expression language such as ${file.permissions}.")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor CHANGE_OWNER = new PropertyDescriptor.Builder()
+             .name("Owner")
+             .description("Sets the owner on the output file to the value of this attribute.  You may also use expression language such as ${file.owner}.")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor CHANGE_GROUP = new PropertyDescriptor.Builder()
+             .name("Group")
+             .description("Sets the group on the output file to the value of this attribute.  You may also use expression language such as ${file.group}.")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor CREATE_DIRS = new PropertyDescriptor.Builder()
+             .name("Create Missing Directories")
+             .description("If true, then missing destination directories will be created. If false, flowfiles are penalized and sent to failure.")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("true")
+             .build();
+ 
+     public static final int MAX_FILE_LOCK_ATTEMPTS = 10;
+     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Files that have been successfully written to the output directory are transferred to this relationship").build();
+     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Files that could not be written to the output directory for some reason are transferred to this relationship").build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         // relationships
+         final Set<Relationship> procRels = new HashSet<>();
+         procRels.add(REL_SUCCESS);
+         procRels.add(REL_FAILURE);
+         relationships = Collections.unmodifiableSet(procRels);
+ 
+         // descriptors
+         final List<PropertyDescriptor> supDescriptors = new ArrayList<>();
+         supDescriptors.add(DIRECTORY);
+         supDescriptors.add(CONFLICT_RESOLUTION);
+         supDescriptors.add(CREATE_DIRS);
+         supDescriptors.add(MAX_DESTINATION_FILES);
+         supDescriptors.add(CHANGE_LAST_MODIFIED_TIME);
+         supDescriptors.add(CHANGE_PERMISSIONS);
+         supDescriptors.add(CHANGE_OWNER);
+         supDescriptors.add(CHANGE_GROUP);
+         properties = Collections.unmodifiableList(supDescriptors);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final StopWatch stopWatch = new StopWatch(true);
+         final Path configuredRootDirPath = Paths.get(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
+         final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
+         final Integer maxDestinationFiles = context.getProperty(MAX_DESTINATION_FILES).asInteger();
+         final ProcessorLog logger = getLogger();
+ 
+         Path tempDotCopyFile = null;
+         try {
+             final Path rootDirPath = configuredRootDirPath;
+             final Path tempCopyFile = rootDirPath.resolve("." + flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+             final Path copyFile = rootDirPath.resolve(flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ 
+             if (!Files.exists(rootDirPath)) {
+                 if (context.getProperty(CREATE_DIRS).asBoolean()) {
+                     Files.createDirectories(rootDirPath);
+                 } else {
+                     flowFile = session.penalize(flowFile);
+                     session.transfer(flowFile, REL_FAILURE);
+                     logger.error("Penalizing {} and routing to 'failure' because the output directory {} does not exist and Processor is configured not to create missing directories", new Object[]{flowFile, rootDirPath});
+                     return;
+                 }
+             }
+ 
+             final Path dotCopyFile = tempCopyFile;
+             tempDotCopyFile = dotCopyFile;
+             Path finalCopyFile = copyFile;
+ 
+             final Path finalCopyFileDir = finalCopyFile.getParent();
+             if (Files.exists(finalCopyFileDir) && maxDestinationFiles != null) { // check if too many files already
+                 final int numFiles = finalCopyFileDir.toFile().list().length;
+ 
+                 if (numFiles >= maxDestinationFiles) {
+                     flowFile = session.penalize(flowFile);
+                     logger.info("Penalizing {} and routing to 'failure' because the output directory {} has {} files, which exceeds the configured maximum number of files", new Object[]{flowFile, finalCopyFileDir, numFiles});
+                     session.transfer(flowFile, REL_FAILURE);
+                     return;
+                 }
+             }
+ 
+             if (Files.exists(finalCopyFile)) {
+                 switch (conflictResponse) {
+                     case REPLACE_RESOLUTION:
+                         Files.delete(finalCopyFile);
+                         logger.info("Deleted {} as configured in order to replace with the contents of {}", new Object[]{finalCopyFile, flowFile});
+                         break;
+                     case IGNORE_RESOLUTION:
+                         session.transfer(flowFile, REL_SUCCESS);
+                         logger.info("Transferring {} to success because file with same name already exists", new Object[]{flowFile});
+                         return;
+                     case FAIL_RESOLUTION:
+                         flowFile = session.penalize(flowFile);
+                         logger.info("Penalizing {} and routing to failure as configured because file with the same name already exists", new Object[]{flowFile});
+                         session.transfer(flowFile, REL_FAILURE);
+                         return;
+                     default:
+                         break;
+                 }
+             }
+ 
+             session.exportTo(flowFile, dotCopyFile, false);
+ 
+             final String lastModifiedTime = context.getProperty(CHANGE_LAST_MODIFIED_TIME).evaluateAttributeExpressions(flowFile).getValue();
+             if (lastModifiedTime != null && !lastModifiedTime.trim().isEmpty()) {
+                 try {
+                     final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
+                     final Date fileModifyTime = formatter.parse(lastModifiedTime);
+                     dotCopyFile.toFile().setLastModified(fileModifyTime.getTime());
+                 } catch (Exception e) {
+                     logger.warn("Could not set file lastModifiedTime to {} because {}", new Object[]{lastModifiedTime, e});
+                 }
+             }
+ 
+             final String permissions = context.getProperty(CHANGE_PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
+             if (permissions != null && !permissions.trim().isEmpty()) {
+                 try {
+                     String perms = stringPermissions(permissions);
+                     if (!perms.isEmpty()) {
+                         Files.setPosixFilePermissions(dotCopyFile, PosixFilePermissions.fromString(perms));
+                     }
+                 } catch (Exception e) {
+                     logger.warn("Could not set file permissions to {} because {}", new Object[]{permissions, e});
+                 }
+             }
+ 
+             final String owner = context.getProperty(CHANGE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
+             if (owner != null && !owner.trim().isEmpty()) {
+                 try {
+                     UserPrincipalLookupService lookupService = dotCopyFile.getFileSystem().getUserPrincipalLookupService();
+                     Files.setOwner(dotCopyFile, lookupService.lookupPrincipalByName(owner));
+                 } catch (Exception e) {
+                     logger.warn("Could not set file owner to {} because {}", new Object[]{owner, e});
+                 }
+             }
+ 
+             final String group = context.getProperty(CHANGE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
+             if (group != null && !group.trim().isEmpty()) {
+                 try {
+                     UserPrincipalLookupService lookupService = dotCopyFile.getFileSystem().getUserPrincipalLookupService();
+                     PosixFileAttributeView view = Files.getFileAttributeView(dotCopyFile, PosixFileAttributeView.class);
+                     view.setGroup(lookupService.lookupPrincipalByGroupName(group));
+                 } catch (Exception e) {
+                     logger.warn("Could not set file group to {} because {}", new Object[]{group, e});
+                 }
+             }
+ 
+             boolean renamed = false;
+             for (int i = 0; i < 10; i++) { // try rename up to 10 times.
+                 if (dotCopyFile.toFile().renameTo(finalCopyFile.toFile())) {
+                     renamed = true;
+                     break;// rename was successful
+                 }
+                 Thread.sleep(100L);// try waiting a few ms to let whatever might cause rename failure to resolve
+             }
+ 
+             if (!renamed) {
+                 if (Files.exists(dotCopyFile) && dotCopyFile.toFile().delete()) {
+                     logger.debug("Deleted dot copy file {}", new Object[]{dotCopyFile});
+                 }
+                 throw new ProcessException("Could not rename: " + dotCopyFile);
+             } else {
+                 logger.info("Produced copy of {} at location {}", new Object[]{flowFile, finalCopyFile});
+             }
+ 
+             session.getProvenanceReporter().send(flowFile, finalCopyFile.toFile().toURI().toString(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+             session.transfer(flowFile, REL_SUCCESS);
+         } catch (final Throwable t) {
+             if (tempDotCopyFile != null) {
+                 try {
+                     Files.deleteIfExists(tempDotCopyFile);
+                 } catch (final Exception e) {
+                     logger.error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
+                 }
+             }
+ 
+             flowFile = session.penalize(flowFile);
+             logger.error("Penalizing {} and transferring to failure due to {}", new Object[]{flowFile, t});
+             session.transfer(flowFile, REL_FAILURE);
+         }
+     }
+ 
+     protected String stringPermissions(String perms) {
+         String permissions = "";
+         final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$");
+         final Pattern numPattern = Pattern.compile("\\d+");
+         if (rwxPattern.matcher(perms).matches()) {
+             permissions = perms;
+         } else if (numPattern.matcher(perms).matches()) {
+             try {
+                 int number = Integer.parseInt(perms, 8);
+                 StringBuilder permBuilder = new StringBuilder();
+                 if ((number & 0x100) > 0) {
+                     permBuilder.append('r');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x80) > 0) {
+                     permBuilder.append('w');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x40) > 0) {
+                     permBuilder.append('x');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x20) > 0) {
+                     permBuilder.append('r');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x10) > 0) {
+                     permBuilder.append('w');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x8) > 0) {
+                     permBuilder.append('x');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x4) > 0) {
+                     permBuilder.append('r');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x2) > 0) {
+                     permBuilder.append('w');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x8) > 0) {
+                     permBuilder.append('x');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 permissions = permBuilder.toString();
+             } catch (NumberFormatException ignore) {
+             }
+         }
+         return permissions;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
index 0000000,39b17c7..ce5bea5
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
@@@ -1,0 -1,374 +1,374 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import static org.apache.nifi.processors.standard.util.JmsFactory.ATTRIBUTE_PREFIX;
+ import static org.apache.nifi.processors.standard.util.JmsFactory.ATTRIBUTE_TYPE_SUFFIX;
+ import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_BOOLEAN;
+ import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_BYTE;
+ import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_DOUBLE;
+ import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_FLOAT;
+ import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_INTEGER;
+ import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_LONG;
+ import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_OBJECT;
+ import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_SHORT;
+ import static org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_STRING;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.ATTRIBUTES_TO_JMS_PROPS;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.MAX_BUFFER_SIZE;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_PRIORITY;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_TTL;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_TYPE;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_BYTE;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_EMPTY;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_STREAM;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_TEXT;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QUEUE;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.nio.charset.Charset;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.Queue;
+ import java.util.Set;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ 
+ import javax.jms.BytesMessage;
+ import javax.jms.Destination;
+ import javax.jms.JMSException;
+ import javax.jms.Message;
+ import javax.jms.MessageProducer;
+ import javax.jms.Session;
+ import javax.jms.StreamMessage;
+ 
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnStopped;
 -import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processors.standard.util.JmsFactory;
+ import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
+ 
+ @Tags({"jms", "send", "put"})
+ @CapabilityDescription("Creates a JMS Message from the contents of a FlowFile and sends the message to a JMS Server")
+ public class PutJMS extends AbstractProcessor {
+ 
+     public static final Charset UTF8 = Charset.forName("UTF-8");
+     public static final int DEFAULT_MESSAGE_PRIORITY = 4;
+ 
+     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+             .description("All FlowFiles that are sent to the JMS destination are routed to this relationship").build();
+     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+             .description("All FlowFiles that cannot be routed to the JMS destination are routed to this relationship").build();
+ 
+     private final Queue<WrappedMessageProducer> producerQueue = new LinkedBlockingQueue<>();
+     private final List<PropertyDescriptor> properties;
+     private final Set<Relationship> relationships;
+ 
+     public PutJMS() {
+         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+         descriptors.add(JMS_PROVIDER);
+         descriptors.add(URL);
+         descriptors.add(DESTINATION_NAME);
+         descriptors.add(DESTINATION_TYPE);
+         descriptors.add(TIMEOUT);
+         descriptors.add(BATCH_SIZE);
+         descriptors.add(USERNAME);
+         descriptors.add(PASSWORD);
+         descriptors.add(MESSAGE_TYPE);
+         descriptors.add(MESSAGE_PRIORITY);
+         descriptors.add(REPLY_TO_QUEUE);
+         descriptors.add(MAX_BUFFER_SIZE);
+         descriptors.add(MESSAGE_TTL);
+         descriptors.add(ATTRIBUTES_TO_JMS_PROPS);
+         descriptors.add(CLIENT_ID_PREFIX);
+         this.properties = Collections.unmodifiableList(descriptors);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @OnStopped
+     public void cleanupResources() {
+         WrappedMessageProducer wrappedProducer = producerQueue.poll();
+         while (wrappedProducer != null) {
+             wrappedProducer.close(getLogger());
+             wrappedProducer = producerQueue.poll();
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+         final ProcessorLog logger = getLogger();
+         final List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger().intValue());
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         WrappedMessageProducer wrappedProducer = producerQueue.poll();
+         if (wrappedProducer == null) {
+             try {
+                 wrappedProducer = JmsFactory.createMessageProducer(context, true);
+                 logger.info("Connected to JMS server {}", new Object[]{context.getProperty(URL).getValue()});
+             } catch (final JMSException e) {
+                 logger.error("Failed to connect to JMS Server due to {}", new Object[]{e});
+                 session.transfer(flowFiles, REL_FAILURE);
+                 context.yield();
+                 return;
+             }
+         }
+ 
+         final Session jmsSession = wrappedProducer.getSession();
+         final MessageProducer producer = wrappedProducer.getProducer();
+ 
+         final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ 
+         try {
+             final Set<FlowFile> successfulFlowFiles = new HashSet<>();
+ 
+             for (FlowFile flowFile : flowFiles) {
+                 if (flowFile.getSize() > maxBufferSize) {
+                     session.transfer(flowFile, REL_FAILURE);
+                     logger.warn("Routing {} to failure because its size exceeds the configured max", new Object[]{flowFile});
+                     continue;
+                 }
+ 
+                 // Read the contents of the FlowFile into a byte array
+                 final byte[] messageContent = new byte[(int) flowFile.getSize()];
+                 session.read(flowFile, new InputStreamCallback() {
+                     @Override
+                     public void process(final InputStream in) throws IOException {
+                         StreamUtils.fillBuffer(in, messageContent, true);
+                     }
+                 });
+ 
+                 final Long ttl = context.getProperty(MESSAGE_TTL).asTimePeriod(TimeUnit.MILLISECONDS);
+ 
+                 final String replyToQueueName = context.getProperty(REPLY_TO_QUEUE).evaluateAttributeExpressions(flowFile).getValue();
+                 final Destination replyToQueue = replyToQueueName == null ? null : JmsFactory.createQueue(context, replyToQueueName);
+ 
+                 int priority = DEFAULT_MESSAGE_PRIORITY;
+                 try {
+                     final Integer priorityInt = context.getProperty(MESSAGE_PRIORITY).evaluateAttributeExpressions(flowFile).asInteger();
+                     priority = priorityInt == null ? priority : priorityInt;
+                 } catch (final NumberFormatException e) {
+                     logger.warn("Invalid value for JMS Message Priority: {}; defaulting to priority of {}", new Object[]{
+                         context.getProperty(MESSAGE_PRIORITY).evaluateAttributeExpressions(flowFile).getValue(), DEFAULT_MESSAGE_PRIORITY});
+                 }
+ 
+                 try {
+                     final Message message = createMessage(jmsSession, context, messageContent, flowFile, replyToQueue, priority);
+                     if (ttl == null) {
+                         producer.setTimeToLive(0L);
+                     } else {
+                         producer.setTimeToLive(ttl);
+                     }
+                     producer.send(message);
+                 } catch (final JMSException e) {
+                     logger.error("Failed to send {} to JMS Server due to {}", new Object[]{flowFile, e});
+                     session.transfer(flowFiles, REL_FAILURE);
+                     context.yield();
+ 
+                     try {
+                         jmsSession.rollback();
+                     } catch (final JMSException jmse) {
+                         logger.warn("Unable to roll back JMS Session due to {}", new Object[]{jmse});
+                     }
+ 
+                     wrappedProducer.close(logger);
+                     return;
+                 }
+ 
+                 successfulFlowFiles.add(flowFile);
+                 session.getProvenanceReporter().send(flowFile, "jms://" + context.getProperty(URL).getValue());
+             }
+ 
+             try {
+                 jmsSession.commit();
+ 
+                 session.transfer(successfulFlowFiles, REL_SUCCESS);
+                 final String flowFileDescription = successfulFlowFiles.size() > 10 ? successfulFlowFiles.size() + " FlowFiles" : successfulFlowFiles.toString();
+                 logger.info("Sent {} to JMS Server and transferred to 'success'", new Object[]{flowFileDescription});
+             } catch (JMSException e) {
+                 logger.error("Failed to commit JMS Session due to {}; rolling back session", new Object[]{e});
+                 session.rollback();
+                 wrappedProducer.close(logger);
+             }
+         } finally {
+             if (!wrappedProducer.isClosed()) {
+                 producerQueue.offer(wrappedProducer);
+             }
+         }
+     }
+ 
+     private Message createMessage(final Session jmsSession, final ProcessContext context, final byte[] messageContent,
+             final FlowFile flowFile, final Destination replyToQueue, final Integer priority) throws JMSException {
+         final Message message;
+ 
+         switch (context.getProperty(MESSAGE_TYPE).getValue()) {
+             case MSG_TYPE_EMPTY: {
+                 message = jmsSession.createTextMessage("");
+             }
+             break;
+             case MSG_TYPE_STREAM: {
+                 final StreamMessage streamMessage = jmsSession.createStreamMessage();
+                 streamMessage.writeBytes(messageContent);
+                 message = streamMessage;
+             }
+             break;
+             case MSG_TYPE_TEXT: {
+                 message = jmsSession.createTextMessage(new String(messageContent, UTF8));
+             }
+             break;
+             case MSG_TYPE_BYTE:
+             default: {
+                 final BytesMessage bytesMessage = jmsSession.createBytesMessage();
+                 bytesMessage.writeBytes(messageContent);
+                 message = bytesMessage;
+             }
+         }
+ 
+         message.setJMSTimestamp(System.currentTimeMillis());
+ 
+         if (replyToQueue != null) {
+             message.setJMSReplyTo(replyToQueue);
+         }
+ 
+         if (priority != null) {
+             message.setJMSPriority(priority);
+         }
+ 
+         if (context.getProperty(ATTRIBUTES_TO_JMS_PROPS).asBoolean()) {
+             copyAttributesToJmsProps(flowFile, message);
+         }
+ 
+         return message;
+     }
+ 
+     /**
+      * Iterates through all of the flow file's metadata and for any metadata key
+      * that starts with <code>jms.</code>, the value for the corresponding key
+      * is written to the JMS message as a property. The name of this property is
+      * equal to the key of the flow file's metadata minus the <code>jms.</code>.
+      * For example, if the flowFile has a metadata entry:
+      * <br /><br />
+      * <code>jms.count</code> = <code>8</code>
+      * <br /><br />
+      * then the JMS message will have a String property added to it with the
+      * property name <code>count</code> and value <code>8</code>.
+      *
+      * If the flow file also has a metadata key with the name
+      * <code>jms.count.type</code>, then the value of that metadata entry will
+      * determine the JMS property type to use for the value. For example, if the
+      * flow file has the following properties:
+      * <br /><br />
+      * <code>jms.count</code> = <code>8</code><br />
+      * <code>jms.count.type</code> = <code>integer</code>
+      * <br /><br />
+      * Then <code>message</code> will have an INTEGER property added with the
+      * value 8.
+      * <br /><br/>
+      * If the type is not valid for the given value (e.g.,
+      * <code>jms.count.type</code> = <code>integer</code> and
+      * <code>jms.count</code> = <code>hello</code>, then this JMS property will
+      * not be added to <code>message</code>.
+      *
+      * @param flowFile The flow file whose metadata should be examined for JMS
+      * properties.
+      * @param message The JMS message to which we want to add properties.
+      * @throws JMSException
+      */
+     private void copyAttributesToJmsProps(final FlowFile flowFile, final Message message) throws JMSException {
+         final ProcessorLog logger = getLogger();
+ 
+         final Map<String, String> attributes = flowFile.getAttributes();
+         for (final Entry<String, String> entry : attributes.entrySet()) {
+             final String key = entry.getKey();
+             final String value = entry.getValue();
+ 
+             if (key.toLowerCase().startsWith(ATTRIBUTE_PREFIX.toLowerCase())
+                     && !key.toLowerCase().endsWith(ATTRIBUTE_TYPE_SUFFIX.toLowerCase())) {
+ 
+                 final String jmsPropName = key.substring(ATTRIBUTE_PREFIX.length());
+                 final String type = attributes.get(key + ATTRIBUTE_TYPE_SUFFIX);
+ 
+                 try {
+                     if (type == null || type.equalsIgnoreCase(PROP_TYPE_STRING)) {
+                         message.setStringProperty(jmsPropName, value);
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_INTEGER)) {
+                         message.setIntProperty(jmsPropName, Integer.parseInt(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_BOOLEAN)) {
+                         message.setBooleanProperty(jmsPropName, Boolean.parseBoolean(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_SHORT)) {
+                         message.setShortProperty(jmsPropName, Short.parseShort(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_LONG)) {
+                         message.setLongProperty(jmsPropName, Long.parseLong(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_BYTE)) {
+                         message.setByteProperty(jmsPropName, Byte.parseByte(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_DOUBLE)) {
+                         message.setDoubleProperty(jmsPropName, Double.parseDouble(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_FLOAT)) {
+                         message.setFloatProperty(jmsPropName, Float.parseFloat(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_OBJECT)) {
+                         message.setObjectProperty(jmsPropName, value);
+                     } else {
+                         logger.warn("Attribute key '{}' for {} has value '{}', but expected one of: integer, string, object, byte, double, float, long, short, boolean; not adding this property",
+                                 new Object[]{key, flowFile, value});
+                     }
+                 } catch (NumberFormatException e) {
+                     logger.warn("Attribute key '{}' for {} has value '{}', but attribute key '{}' has value '{}'. Not adding this JMS property",
+                             new Object[]{key, flowFile, value, key + ATTRIBUTE_TYPE_SUFFIX, PROP_TYPE_INTEGER});
+                 }
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
index 0000000,a8d9c18..cfd522c
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
@@@ -1,0 -1,85 +1,85 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processors.standard.util.SFTPTransfer;
+ 
+ @SupportsBatching
+ @Tags({"remote", "copy", "egress", "put", "sftp", "archive", "files"})
+ @CapabilityDescription("Sends FlowFiles to an SFTP Server")
+ public class PutSFTP extends PutFileTransfer<SFTPTransfer> {
+ 
+     private List<PropertyDescriptor> properties;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(SFTPTransfer.HOSTNAME);
+         properties.add(SFTPTransfer.PORT);
+         properties.add(SFTPTransfer.USERNAME);
+         properties.add(SFTPTransfer.PASSWORD);
+         properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
+         properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
+         properties.add(SFTPTransfer.REMOTE_PATH);
+         properties.add(SFTPTransfer.CREATE_DIRECTORY);
+         properties.add(SFTPTransfer.BATCH_SIZE);
+         properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
+         properties.add(SFTPTransfer.DATA_TIMEOUT);
+         properties.add(SFTPTransfer.CONFLICT_RESOLUTION);
+         properties.add(SFTPTransfer.REJECT_ZERO_BYTE);
+         properties.add(SFTPTransfer.DOT_RENAME);
+         properties.add(SFTPTransfer.TEMP_FILENAME);
+         properties.add(SFTPTransfer.HOST_KEY_FILE);
+         properties.add(SFTPTransfer.LAST_MODIFIED_TIME);
+         properties.add(SFTPTransfer.PERMISSIONS);
+         properties.add(SFTPTransfer.REMOTE_OWNER);
+         properties.add(SFTPTransfer.REMOTE_GROUP);
+         properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
+         properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+         properties.add(SFTPTransfer.USE_COMPRESSION);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
+         if (SFTPTransfer.DISABLE_DIRECTORY_LISTING.getName().equalsIgnoreCase(propertyDescriptorName)) {
+             return SFTPTransfer.DISABLE_DIRECTORY_LISTING;
+         }
+         return super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
+     }
+ 
+     @Override
+     protected SFTPTransfer getFileTransfer(final ProcessContext context) {
+         return new SFTPTransfer(context, getLogger());
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index 0000000,ae5350b..111dead
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@@ -1,0 -1,289 +1,289 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
+ import java.io.BufferedWriter;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.expression.AttributeValueDecorator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.processor.util.FlowFileFilters;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.processors.standard.util.NLKBufferedReader;
+ import org.apache.nifi.util.StopWatch;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.InputStreamReader;
+ import java.io.OutputStream;
+ import java.io.OutputStreamWriter;
+ import java.nio.charset.Charset;
+ import java.util.*;
+ import java.util.concurrent.TimeUnit;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify", "Regex"})
+ @CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression against it and replacing the section of the content that matches the Regular Expression with some alternate value.")
+ public class ReplaceText extends AbstractProcessor {
+ 
+     //Constants
+     public static final String LINE_BY_LINE = "Line-by-Line";
+     public static final String ENTIRE_TEXT = "Entire text";
+     private final Pattern backReferencePattern = Pattern.compile("\\$(\\d+)");
+     private static final byte[] ZERO_BYTE_BUFFER = new byte[0];
+     // Properties
+     public static final PropertyDescriptor REGEX = new PropertyDescriptor.Builder()
+             .name("Regular Expression")
+             .description("The Regular Expression to search for in the FlowFile content")
+             .required(true)
+             .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
+             .expressionLanguageSupported(true)
+             .defaultValue("(.*)")
+             .build();
+     public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder()
+             .name("Replacement Value")
+             .description("The value to replace the regular expression with. Back-references to Regular Expression capturing groups are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.")
+             .required(true)
+             .defaultValue("$1")
+             .addValidator(Validator.VALID)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+             .name("Character Set")
+             .description("The Character Set in which the file is encoded")
+             .required(true)
+             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+             .defaultValue("UTF-8")
+             .build();
+     public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
+             .name("Maximum Buffer Size")
+             .description("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. "
+                     + "In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'")
+             .required(true)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .build();
+     public static final PropertyDescriptor EVALUATION_MODE = new PropertyDescriptor.Builder()
+             .name("Evaluation Mode")
+             .description("Evaluate the 'Regular Expression' against each line (Line-by-Line) or buffer the entire file into memory (Entire Text) and then evaluate the 'Regular Expression'.")
+             .allowableValues(LINE_BY_LINE, ENTIRE_TEXT)
+             .defaultValue(ENTIRE_TEXT)
+             .required(true)
+             .build();
+     // Relationships
+     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression").build();
+     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that could not be updated are routed to this relationship").build();
+     //
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(REGEX);
+         properties.add(REPLACEMENT_VALUE);
+         properties.add(CHARACTER_SET);
+         properties.add(MAX_BUFFER_SIZE);
+         properties.add(EVALUATION_MODE);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+         final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 100));
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final String unsubstitutedRegex = context.getProperty(REGEX).getValue();
+         String unsubstitutedReplacement = context.getProperty(REPLACEMENT_VALUE).getValue();
+         if (unsubstitutedRegex.equals("(.*)") && unsubstitutedReplacement.equals("$1")) {
+             // This pattern says replace content with itself. We can highly optimize this process by simply transferring
+             // all FlowFiles to the 'success' relationship
+             session.transfer(flowFiles, REL_SUCCESS);
+             return;
+         }
+ 
+         final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() {
+             @Override
+             public String decorate(final String attributeValue) {
+                 return Pattern.quote(attributeValue);
+             }
+         };
+ 
+         final AttributeValueDecorator escapeBackRefDecorator = new AttributeValueDecorator() {
+             @Override
+             public String decorate(final String attributeValue) {
+                 return attributeValue.replace("$", "\\$");
+             }
+         };
+ 
+         final String regexValue = context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
+         final int numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
+ 
+         final boolean skipBuffer = ".*".equals(unsubstitutedRegex);
+ 
+         final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+         final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ 
+         final byte[] buffer = skipBuffer ? ZERO_BYTE_BUFFER : new byte[maxBufferSize];
+ 
+         final String evaluateMode = context.getProperty(EVALUATION_MODE).getValue();
+ 
+         for (FlowFile flowFile : flowFiles) {
+             if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
+                 if (flowFile.getSize() > maxBufferSize && !skipBuffer) {
+                     session.transfer(flowFile, REL_FAILURE);
+                     continue;
+                 }
+             }
+ 
+             String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, escapeBackRefDecorator).getValue();
+             final Matcher backRefMatcher = backReferencePattern.matcher(replacement);
+             while (backRefMatcher.find()) {
+                 final String backRefNum = backRefMatcher.group(1);
+                 if (backRefNum.startsWith("0")) {
+                     continue;
+                 }
+                 final int originalBackRefIndex = Integer.parseInt(backRefNum);
+                 int backRefIndex = originalBackRefIndex;
+ 
+                 // if we have a replacement value like $123, and we have less than 123 capturing groups, then 
+                 // we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups,
+                 // then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then
+                 // we want to truncate the 1 and get 0.
+                 while (backRefIndex > numCapturingGroups && backRefIndex >= 10) {
+                     backRefIndex /= 10;
+                 }
+ 
+                 if (backRefIndex > numCapturingGroups) {
+                     final StringBuilder sb = new StringBuilder(replacement.length() + 1);
+                     final int groupStart = backRefMatcher.start(1);
+ 
+                     sb.append(replacement.substring(0, groupStart - 1));
+                     sb.append("\\");
+                     sb.append(replacement.substring(groupStart - 1));
+                     replacement = sb.toString();
+                 }
+             }
+ 
+             replacement = replacement.replaceAll("(\\$\\D)", "\\\\$1");
+ 
+             // always match; just overwrite value with the replacement value; this optimization prevents us
+             // from reading the file at all.
+             final String replacementValue = replacement;
+             if (skipBuffer) {
+                 final StopWatch stopWatch = new StopWatch(true);
+                 if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
+                     flowFile = session.write(flowFile, new OutputStreamCallback() {
+                         @Override
+                         public void process(final OutputStream out) throws IOException {
+                             out.write(replacementValue.getBytes(charset));
+                         }
+                     });
+                 } else {
+                     flowFile = session.write(flowFile, new StreamCallback() {
+                         @Override
+                         public void process(final InputStream in, final OutputStream out) throws IOException {
+                             try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
+                                     BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
+                                 while (null != br.readLine()) {
+                                     bw.write(replacementValue);
+                                 }
+                             }
+                         }
+                     });
+                 }
+                 session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                 session.transfer(flowFile, REL_SUCCESS);
+                 logger.info("Transferred {} to 'success'", new Object[]{flowFile});
+                 continue;
+             }
+ 
+             final StopWatch stopWatch = new StopWatch(true);
+             final String regex = context.getProperty(REGEX).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
+ 
+             if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
+                 final int flowFileSize = (int) flowFile.getSize();
+                 flowFile = session.write(flowFile, new StreamCallback() {
+                     @Override
+                     public void process(final InputStream in, final OutputStream out) throws IOException {
+                         StreamUtils.fillBuffer(in, buffer, false);
+                         final String contentString = new String(buffer, 0, flowFileSize, charset);
+                         final String updatedValue = contentString.replaceAll(regex, replacementValue);
+                         out.write(updatedValue.getBytes(charset));
+                     }
+                 });
+             } else {
+                 flowFile = session.write(flowFile, new StreamCallback() {
+                     @Override
+                     public void process(final InputStream in, final OutputStream out) throws IOException {
+                         try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
+                                 BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
+                             String oneLine;
+                             while (null != (oneLine = br.readLine())) {
+                                 final String updatedValue = oneLine.replaceAll(regex, replacementValue);
+                                 bw.write(updatedValue);
+                             }
+                         }
+                     }
+                 });
+             }
+ 
+             logger.info("Transferred {} to 'success'", new Object[]{flowFile});
+             session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+             session.transfer(flowFile, REL_SUCCESS);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
index 0000000,c99935b..c4dd83a
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
@@@ -1,0 -1,383 +1,383 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.BufferedReader;
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.InputStreamReader;
+ import java.io.OutputStream;
+ import java.nio.charset.Charset;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.expression.AttributeValueDecorator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.StopWatch;
+ 
+ import org.apache.commons.lang3.StringUtils;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify", "Regex", "Mapping"})
+ @CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression against it and replacing the section of the content that matches the Regular Expression with some alternate value provided in a mapping file.")
+ public class ReplaceTextWithMapping extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor REGEX = new PropertyDescriptor.Builder()
+             .name("Regular Expression")
+             .description("The Regular Expression to search for in the FlowFile content")
+             .required(true)
+             .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
+             .expressionLanguageSupported(true)
+             .defaultValue("\\S+")
+             .build();
+     public static final PropertyDescriptor MATCHING_GROUP_FOR_LOOKUP_KEY = new PropertyDescriptor.Builder()
+             .name("Matching Group")
+             .description("The number of the matching group of the provided regex to replace with the corresponding value from the mapping file (if it exists).")
+             .addValidator(StandardValidators.INTEGER_VALIDATOR)
+             .required(true)
+             .expressionLanguageSupported(true)
+             .defaultValue("0").build();
+     public static final PropertyDescriptor MAPPING_FILE = new PropertyDescriptor.Builder()
+             .name("Mapping File")
+             .description("The name of the file (including the full path) containing the Mappings.")
+             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+             .required(true)
+             .build();
+     public static final PropertyDescriptor MAPPING_FILE_REFRESH_INTERVAL = new PropertyDescriptor.Builder()
+             .name("Mapping File Refresh Interval")
+             .description("The polling interval in seconds to check for updates to the mapping file. The default is 60s.")
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .required(true)
+             .defaultValue("60s").build();
+     public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+             .name("Character Set")
+             .description("The Character Set in which the file is encoded")
+             .required(true)
+             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+             .defaultValue("UTF-8")
+             .build();
+     public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
+             .name("Maximum Buffer Size")
+             .description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. If a FlowFile is larger than this value, the FlowFile will be routed to 'failure'")
+             .required(true)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression").build();
+     public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that could not be updated are routed to this relationship").build();
+ 
+     private final Pattern backReferencePattern = Pattern.compile("[^\\\\]\\$(\\d+)");
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     private final ReentrantLock processorLock = new ReentrantLock();
+     private final AtomicLong lastModified = new AtomicLong(0L);
+     final AtomicLong mappingTestTime = new AtomicLong(0);
+     private final AtomicReference<ConfigurationState> configurationStateRef = new AtomicReference<>(
+             new ConfigurationState(null));
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+         final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
+ 
+         final String regexValue = context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
+         final int numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
+         final int groupToMatch = context.getProperty(MATCHING_GROUP_FOR_LOOKUP_KEY).evaluateAttributeExpressions().asInteger();
+ 
+         if (groupToMatch > numCapturingGroups) {
+             errors.add(new ValidationResult.Builder().subject("Insufficient Matching Groups").valid(false).explanation("The specified matching group does not exist for the regular expression provided").build());
+         }
+         return errors;
+     }
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(REGEX);
+         properties.add(MATCHING_GROUP_FOR_LOOKUP_KEY);
+         properties.add(MAPPING_FILE);
+         properties.add(MAPPING_FILE_REFRESH_INTERVAL);
+         properties.add(CHARACTER_SET);
+         properties.add(MAX_BUFFER_SIZE);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+         updateMapping(context);
+         final List<FlowFile> flowFiles = session.get(5);
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+ 
+         final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ 
+         for (FlowFile flowFile : flowFiles) {
+             if (flowFile.getSize() > maxBufferSize) {
+                 session.transfer(flowFile, REL_FAILURE);
+                 continue;
+             }
+ 
+             final StopWatch stopWatch = new StopWatch(true);
+ 
+             flowFile = session.write(flowFile, new ReplaceTextCallback(context, flowFile, maxBufferSize));
+ 
+             logger.info("Transferred {} to 'success'", new Object[]{flowFile});
+             session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+             session.transfer(flowFile, REL_SUCCESS);
+         }
+     }
+ 
+     protected String fillReplacementValueBackReferences(String rawReplacementValue, int numCapturingGroups) {
+         String replacement = rawReplacementValue;
+         final Matcher backRefMatcher = backReferencePattern.matcher(replacement);
+         int replacementCount = 0;
+         while (backRefMatcher.find()) {
+             final int backRefIndex = Integer.parseInt(backRefMatcher.group(1));
+             if (backRefIndex > numCapturingGroups || backRefIndex < 0) {
+                 final StringBuilder sb = new StringBuilder(replacement.length() + 1);
+                 final int groupStart = backRefMatcher.start(1) + replacementCount++;
+ 
+                 sb.append(replacement.substring(0, groupStart - 1));
+                 sb.append("\\");
+                 sb.append(replacement.substring(groupStart - 1));
+                 replacement = sb.toString();
+             }
+         }
+ 
+         replacement = replacement.replaceAll("(\\$\\D)", "\\\\$1");
+ 
+         return replacement;
+     }
+ 
+     private void updateMapping(final ProcessContext context) {
+         if (processorLock.tryLock()) {
+             final ProcessorLog logger = getLogger();
+             try {
+                 // if not queried mapping file lastUpdate time in
+                 // mapppingRefreshPeriodSecs, do so.
+                 long currentTimeSecs = System.currentTimeMillis() / 1000;
+                 long mappingRefreshPeriodSecs = context.getProperty(MAPPING_FILE_REFRESH_INTERVAL).asTimePeriod(TimeUnit.SECONDS);
+ 
+                 boolean retry = (currentTimeSecs > (mappingTestTime.get() + mappingRefreshPeriodSecs));
+                 if (retry) {
+                     mappingTestTime.set(System.currentTimeMillis() / 1000);
+                     // see if the mapping file needs to be reloaded
+                     final String fileName = context.getProperty(MAPPING_FILE).getValue();
+                     final File file = new File(fileName);
+                     if (file.exists() && file.isFile() && file.canRead()) {
+                         if (file.lastModified() > lastModified.get()) {
+                             lastModified.getAndSet(file.lastModified());
+                             try (FileInputStream is = new FileInputStream(file)) {
+                                 logger.info("Reloading mapping file: {}", new Object[]{fileName});
+ 
+                                 final Map<String, String> mapping = loadMappingFile(is);
+                                 final ConfigurationState newState = new ConfigurationState(
+                                         mapping);
+                                 configurationStateRef.set(newState);
+                             } catch (IOException e) {
+                                 logger.error("Error reading mapping file: {}", new Object[]{e.getMessage()});
+                             }
+                         }
+                     } else {
+                         logger.error("Mapping file does not exist or is not readable: {}", new Object[]{fileName});
+                     }
+                 }
+             } catch (Exception e) {
+                 logger.error("Error loading mapping file: {}", new Object[]{e.getMessage()});
+             } finally {
+                 processorLock.unlock();
+             }
+         }
+     }
+ 
+     /**
+      * Loads a file containing mappings.
+      *
+      * @param is
+      * @return 
+      * @throws IOException
+      */
+     protected Map<String, String> loadMappingFile(InputStream is) throws IOException {
+         Map<String, String> mapping = new HashMap<>();
+         BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+         String line = null;
+         while ((line = reader.readLine()) != null) {
+             final String[] splits = StringUtils.split(line, "\t ", 2);
+             if (splits.length == 1) {
+                 mapping.put(splits[0].trim(), ""); // support key with empty value
+             } else if (splits.length == 2) {
+                 final String key = splits[0].trim();
+                 final String value = splits[1].trim();
+                 mapping.put(key, value);
+             }
+         }
+         return mapping;
+     }
+ 
+     public static class ConfigurationState {
+ 
+         final Map<String, String> mapping = new HashMap<>();
+ 
+         public ConfigurationState(final Map<String, String> mapping) {
+             if (mapping != null) {
+                 this.mapping.putAll(mapping);
+             }
+         }
+ 
+         public Map<String, String> getMapping() {
+             return Collections.unmodifiableMap(mapping);
+         }
+ 
+         public boolean isConfigured() {
+             return !mapping.isEmpty();
+         }
+     }
+ 
+     private final class ReplaceTextCallback implements StreamCallback {
+ 
+         private final Charset charset;
+         private final byte[] buffer;
+         private final String regex;
+         private final FlowFile flowFile;
+         private final int numCapturingGroups;
+         private final int groupToMatch;
+ 
+         private final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() {
+             @Override
+             public String decorate(final String attributeValue) {
+                 return Pattern.quote(attributeValue);
+             }
+         };
+ 
+         private ReplaceTextCallback(ProcessContext context, FlowFile flowFile, int maxBufferSize) {
+             this.regex = context.getProperty(REGEX).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
+             this.flowFile = flowFile;
+ 
+             this.charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+ 
+             final String regexValue = context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
+             this.numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
+ 
+             this.buffer = new byte[maxBufferSize];
+ 
+             this.groupToMatch = context.getProperty(MATCHING_GROUP_FOR_LOOKUP_KEY).evaluateAttributeExpressions().asInteger();
+         }
+ 
+         @Override
+         public void process(final InputStream in, final OutputStream out) throws IOException {
+ 
+             final Map<String, String> mapping = configurationStateRef.get()
+                     .getMapping();
+ 
+             StreamUtils.fillBuffer(in, buffer, false);
+ 
+             final int flowFileSize = (int) flowFile.getSize();
+ 
+             final String contentString = new String(buffer, 0, flowFileSize, charset);
+ 
+             final Matcher matcher = Pattern.compile(regex).matcher(contentString);
+ 
+             matcher.reset();
+             boolean result = matcher.find();
+             if (result) {
+                 StringBuffer sb = new StringBuffer();
+                 do {
+                     String matched = matcher.group(groupToMatch);
+                     String rv = mapping.get(matched);
+ 
+                     if (rv == null) {
+                         String replacement = matcher.group().replace("$", "\\$");
+                         matcher.appendReplacement(sb, replacement);
+                     } else {
+                         String allRegexMatched = matcher.group(); //this is everything that matched the regex
+ 
+                         int scaledStart = matcher.start(groupToMatch) - matcher.start();
+                         int scaledEnd = scaledStart + matcher.group(groupToMatch).length();
+ 
+                         StringBuilder replacementBuilder = new StringBuilder();
+ 
+                         replacementBuilder.append(allRegexMatched.substring(0, scaledStart).replace("$", "\\$"));
+                         replacementBuilder.append(fillReplacementValueBackReferences(rv, numCapturingGroups));
+                         replacementBuilder.append(allRegexMatched.substring(scaledEnd).replace("$", "\\$"));
+ 
+                         matcher.appendReplacement(sb, replacementBuilder.toString());
+                     }
+                     result = matcher.find();
+                 } while (result);
+                 matcher.appendTail(sb);
+                 out.write(sb.toString().getBytes(charset));
+                 return;
+             }
+             out.write(contentString.getBytes(charset));
+         }
+     }
+ }