You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/27 13:45:22 UTC

[11/16] incubator-nifi git commit: NIFI-271 chipping away - more work left in standard bundle

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
index da80546..893aee9 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
@@ -43,13 +43,23 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * Base class for PutFTP & PutSFTP
+ *
  * @param <T>
  */
 public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractProcessor {
 
-    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are successfully sent will be routed to success").build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that failed to send to the remote system; failure is usually looped back to this processor").build();
-    public static final Relationship REL_REJECT = new Relationship.Builder().name("reject").description("FlowFiles that were rejected by the destination system").build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().
+            name("success").
+            description("FlowFiles that are successfully sent will be routed to success").
+            build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().
+            name("failure").
+            description("FlowFiles that failed to send to the remote system; failure is usually looped back to this processor").
+            build();
+    public static final Relationship REL_REJECT = new Relationship.Builder().
+            name("reject").
+            description("FlowFiles that were rejected by the destination system").
+            build();
 
     private final Set<Relationship> relationships;
 
@@ -85,26 +95,42 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
         }
 
         final ProcessorLog logger = getLogger();
-        final String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String hostname = context.getProperty(FileTransfer.HOSTNAME).
+                evaluateAttributeExpressions(flowFile).
+                getValue();
 
-        final int maxNumberOfFiles = context.getProperty(FileTransfer.BATCH_SIZE).asInteger();
+        final int maxNumberOfFiles = context.
+                getProperty(FileTransfer.BATCH_SIZE).
+                asInteger();
         int fileCount = 0;
         try (final T transfer = getFileTransfer(context)) {
             do {
-                final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue();
+                final String rootPath = context.
+                        getProperty(FileTransfer.REMOTE_PATH).
+                        evaluateAttributeExpressions(flowFile).
+                        getValue();
                 final String workingDirPath;
                 if (rootPath == null) {
                     workingDirPath = null;
                 } else {
                     File workingDirectory = new File(rootPath);
-                    if (!workingDirectory.getPath().startsWith("/") && !workingDirectory.getPath().startsWith("\\")) {
-                        workingDirectory = new File(transfer.getHomeDirectory(flowFile), workingDirectory.getPath());
+                    if (!workingDirectory.getPath().
+                            startsWith("/") && !workingDirectory.getPath().
+                            startsWith("\\")) {
+                        workingDirectory = new File(transfer.
+                                getHomeDirectory(flowFile), workingDirectory.
+                                getPath());
                     }
-                    workingDirPath = workingDirectory.getPath().replace("\\", "/");
+                    workingDirPath = workingDirectory.getPath().
+                            replace("\\", "/");
                 }
 
-                final boolean rejectZeroByteFiles = context.getProperty(FileTransfer.REJECT_ZERO_BYTE).asBoolean();
-                final ConflictResult conflictResult = identifyAndResolveConflictFile(context.getProperty(FileTransfer.CONFLICT_RESOLUTION).getValue(),
+                final boolean rejectZeroByteFiles = context.
+                        getProperty(FileTransfer.REJECT_ZERO_BYTE).
+                        asBoolean();
+                final ConflictResult conflictResult = identifyAndResolveConflictFile(context.
+                        getProperty(FileTransfer.CONFLICT_RESOLUTION).
+                        getValue(),
                         transfer, workingDirPath, flowFile, rejectZeroByteFiles, logger);
 
                 if (conflictResult.isTransfer()) {
@@ -118,28 +144,37 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
                         @Override
                         public void process(final InputStream in) throws IOException {
                             try (final InputStream bufferedIn = new BufferedInputStream(in)) {
-                                if (workingDirPath != null && context.getProperty(SFTPTransfer.CREATE_DIRECTORY).asBoolean()) {
-                                    transfer.ensureDirectoryExists(flowFileToTransfer, new File(workingDirPath));
+                                if (workingDirPath != null && context.
+                                        getProperty(SFTPTransfer.CREATE_DIRECTORY).
+                                        asBoolean()) {
+                                    transfer.
+                                            ensureDirectoryExists(flowFileToTransfer, new File(workingDirPath));
                                 }
 
-                                fullPathRef.set(transfer.put(flowFileToTransfer, workingDirPath, conflictResult.getFileName(), bufferedIn));
+                                fullPathRef.set(transfer.
+                                        put(flowFileToTransfer, workingDirPath, conflictResult.
+                                                getFileName(), bufferedIn));
                             }
                         }
                     });
                     afterPut(flowFile, context, transfer);
 
                     stopWatch.stop();
-                    final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
-                    final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-                    logger.info("Successfully transfered {} to {} on remote host {} in {} milliseconds at a rate of {}",
-                            new Object[]{flowFile, fullPathRef.get(), hostname, millis, dataRate});
+                    final String dataRate = stopWatch.
+                            calculateDataRate(flowFile.getSize());
+                    final long millis = stopWatch.
+                            getDuration(TimeUnit.MILLISECONDS);
+                    logger.
+                            info("Successfully transfered {} to {} on remote host {} in {} milliseconds at a rate of {}",
+                                    new Object[]{flowFile, fullPathRef.get(), hostname, millis, dataRate});
 
                     String fullPathWithSlash = fullPathRef.get();
                     if (!fullPathWithSlash.startsWith("/")) {
                         fullPathWithSlash = "/" + fullPathWithSlash;
                     }
                     final String destinationUri = transfer.getProtocolName() + "://" + hostname + fullPathWithSlash;
-                    session.getProvenanceReporter().send(flowFile, destinationUri, millis);
+                    session.getProvenanceReporter().
+                            send(flowFile, destinationUri, millis);
                 }
 
                 if (conflictResult.isPenalize()) {
@@ -148,20 +183,28 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
 
                 session.transfer(flowFile, conflictResult.getRelationship());
                 session.commit();
-            } while (isScheduled() && (getRelationships().size() == context.getAvailableRelationships().size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.get()) != null));
+            } while (isScheduled() && (getRelationships().
+                    size() == context.getAvailableRelationships().
+                    size()) && (++fileCount < maxNumberOfFiles) && ((flowFile = session.
+                    get()) != null));
         } catch (final IOException e) {
             context.yield();
-            logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e});
+            logger.
+                    error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e});
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
         } catch (final FlowFileAccessException e) {
             context.yield();
-            logger.error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e.getCause()});
+            logger.
+                    error("Unable to transfer {} to remote host {} due to {}", new Object[]{flowFile, hostname, e.
+                        getCause()});
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
         } catch (final ProcessException e) {
             context.yield();
-            logger.error("Unable to transfer {} to remote host {} due to {}: {}; routing to failure", new Object[]{flowFile, hostname, e, e.getCause()});
+            logger.
+                    error("Unable to transfer {} to remote host {} due to {}: {}; routing to failure", new Object[]{flowFile, hostname, e, e.
+                        getCause()});
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
         }
@@ -179,53 +222,62 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
         if (rejectZeroByteFiles) {
             final long sizeInBytes = flowFile.getSize();
             if (sizeInBytes == 0) {
-                logger.warn("Rejecting {} because it is zero bytes", new Object[]{flowFile});
+                logger.
+                        warn("Rejecting {} because it is zero bytes", new Object[]{flowFile});
                 return new ConflictResult(REL_REJECT, false, fileName, true);
             }
         }
 
         //Second, check if the user doesn't care about detecting naming conflicts ahead of time
-        if (conflictResolutionType.equalsIgnoreCase(FileTransfer.CONFLICT_RESOLUTION_NONE)) {
+        if (conflictResolutionType.
+                equalsIgnoreCase(FileTransfer.CONFLICT_RESOLUTION_NONE)) {
             return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile);
         }
 
-        final FileInfo remoteFileInfo = transfer.getRemoteFileInfo(flowFile, path, fileName);
+        final FileInfo remoteFileInfo = transfer.
+                getRemoteFileInfo(flowFile, path, fileName);
         if (remoteFileInfo == null) {
             return new ConflictResult(destinationRelationship, transferFile, fileName, penalizeFile);
         }
 
         if (remoteFileInfo.isDirectory()) {
-            logger.info("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
+            logger.
+                    info("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
             return new ConflictResult(REL_REJECT, false, fileName, false);
         }
 
-        logger.info("Discovered a filename conflict on the remote server for {} so handling using configured Conflict Resolution of {}",
-                new Object[]{flowFile, conflictResolutionType});
+        logger.
+                info("Discovered a filename conflict on the remote server for {} so handling using configured Conflict Resolution of {}",
+                        new Object[]{flowFile, conflictResolutionType});
 
         switch (conflictResolutionType.toUpperCase()) {
             case FileTransfer.CONFLICT_RESOLUTION_REJECT:
                 destinationRelationship = REL_REJECT;
                 transferFile = false;
                 penalizeFile = false;
-                logger.info("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
+                logger.
+                        info("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
                 break;
             case FileTransfer.CONFLICT_RESOLUTION_REPLACE:
                 transfer.deleteFile(path, fileName);
                 destinationRelationship = REL_SUCCESS;
                 transferFile = true;
                 penalizeFile = false;
-                logger.info("Resolving filename conflict for {} with remote server by deleting remote file and replacing with flow file", new Object[]{flowFile});
+                logger.
+                        info("Resolving filename conflict for {} with remote server by deleting remote file and replacing with flow file", new Object[]{flowFile});
                 break;
             case FileTransfer.CONFLICT_RESOLUTION_RENAME:
                 boolean uniqueNameGenerated = false;
                 for (int i = 1; i < 100 && !uniqueNameGenerated; i++) {
                     String possibleFileName = i + "." + fileName;
 
-                    final FileInfo renamedFileInfo = transfer.getRemoteFileInfo(flowFile, path, possibleFileName);
+                    final FileInfo renamedFileInfo = transfer.
+                            getRemoteFileInfo(flowFile, path, possibleFileName);
                     uniqueNameGenerated = (renamedFileInfo == null);
                     if (uniqueNameGenerated) {
                         fileName = possibleFileName;
-                        logger.info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", new Object[]{flowFile, fileName});
+                        logger.
+                                info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", new Object[]{flowFile, fileName});
                         destinationRelationship = REL_SUCCESS;
                         transferFile = true;
                         penalizeFile = false;
@@ -236,20 +288,23 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
                     destinationRelationship = REL_REJECT;
                     transferFile = false;
                     penalizeFile = false;
-                    logger.info("Could not determine a unique name after 99 attempts for.  Switching resolution mode to REJECT for " + flowFile);
+                    logger.
+                            info("Could not determine a unique name after 99 attempts for.  Switching resolution mode to REJECT for " + flowFile);
                 }
                 break;
             case FileTransfer.CONFLICT_RESOLUTION_IGNORE:
                 destinationRelationship = REL_SUCCESS;
                 transferFile = false;
                 penalizeFile = false;
-                logger.info("Resolving conflict for {}  by not transferring file and and still considering the process a success.", new Object[]{flowFile});
+                logger.
+                        info("Resolving conflict for {}  by not transferring file and and still considering the process a success.", new Object[]{flowFile});
                 break;
             case FileTransfer.CONFLICT_RESOLUTION_FAIL:
                 destinationRelationship = REL_FAILURE;
                 transferFile = false;
                 penalizeFile = true;
-                logger.info("Resolved filename conflict for {} as configured by routing to FAILURE relationship.", new Object[]{flowFile});
+                logger.
+                        info("Resolved filename conflict for {} as configured by routing to FAILURE relationship.", new Object[]{flowFile});
             default:
                 break;
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
index c7c78a1..b37471e 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
@@ -96,10 +96,14 @@ public class PutJMS extends AbstractProcessor {
     public static final Charset UTF8 = Charset.forName("UTF-8");
     public static final int DEFAULT_MESSAGE_PRIORITY = 4;
 
-    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
-            .description("All FlowFiles that are sent to the JMS destination are routed to this relationship").build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
-            .description("All FlowFiles that cannot be routed to the JMS destination are routed to this relationship").build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().
+            name("success").
+            description("All FlowFiles that are sent to the JMS destination are routed to this relationship").
+            build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().
+            name("failure").
+            description("All FlowFiles that cannot be routed to the JMS destination are routed to this relationship").
+            build();
 
     private final Queue<WrappedMessageProducer> producerQueue = new LinkedBlockingQueue<>();
     private final List<PropertyDescriptor> properties;
@@ -152,7 +156,10 @@ public class PutJMS extends AbstractProcessor {
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         final ProcessorLog logger = getLogger();
-        final List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger().intValue());
+        final List<FlowFile> flowFiles = session.get(context.
+                getProperty(BATCH_SIZE).
+                asInteger().
+                intValue());
         if (flowFiles.isEmpty()) {
             return;
         }
@@ -160,10 +167,14 @@ public class PutJMS extends AbstractProcessor {
         WrappedMessageProducer wrappedProducer = producerQueue.poll();
         if (wrappedProducer == null) {
             try {
-                wrappedProducer = JmsFactory.createMessageProducer(context, true);
-                logger.info("Connected to JMS server {}", new Object[]{context.getProperty(URL).getValue()});
+                wrappedProducer = JmsFactory.
+                        createMessageProducer(context, true);
+                logger.info("Connected to JMS server {}", new Object[]{context.
+                    getProperty(URL).
+                    getValue()});
             } catch (final JMSException e) {
-                logger.error("Failed to connect to JMS Server due to {}", new Object[]{e});
+                logger.
+                        error("Failed to connect to JMS Server due to {}", new Object[]{e});
                 session.transfer(flowFiles, REL_FAILURE);
                 context.yield();
                 return;
@@ -173,7 +184,9 @@ public class PutJMS extends AbstractProcessor {
         final Session jmsSession = wrappedProducer.getSession();
         final MessageProducer producer = wrappedProducer.getProducer();
 
-        final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).
+                asDataSize(DataUnit.B).
+                intValue();
 
         try {
             final Set<FlowFile> successfulFlowFiles = new HashSet<>();
@@ -181,7 +194,8 @@ public class PutJMS extends AbstractProcessor {
             for (FlowFile flowFile : flowFiles) {
                 if (flowFile.getSize() > maxBufferSize) {
                     session.transfer(flowFile, REL_FAILURE);
-                    logger.warn("Routing {} to failure because its size exceeds the configured max", new Object[]{flowFile});
+                    logger.
+                            warn("Routing {} to failure because its size exceeds the configured max", new Object[]{flowFile});
                     continue;
                 }
 
@@ -194,18 +208,29 @@ public class PutJMS extends AbstractProcessor {
                     }
                 });
 
-                final Long ttl = context.getProperty(MESSAGE_TTL).asTimePeriod(TimeUnit.MILLISECONDS);
+                final Long ttl = context.getProperty(MESSAGE_TTL).
+                        asTimePeriod(TimeUnit.MILLISECONDS);
 
-                final String replyToQueueName = context.getProperty(REPLY_TO_QUEUE).evaluateAttributeExpressions(flowFile).getValue();
-                final Destination replyToQueue = replyToQueueName == null ? null : JmsFactory.createQueue(context, replyToQueueName);
+                final String replyToQueueName = context.
+                        getProperty(REPLY_TO_QUEUE).
+                        evaluateAttributeExpressions(flowFile).
+                        getValue();
+                final Destination replyToQueue = replyToQueueName == null ? null : JmsFactory.
+                        createQueue(context, replyToQueueName);
 
                 int priority = DEFAULT_MESSAGE_PRIORITY;
                 try {
-                    final Integer priorityInt = context.getProperty(MESSAGE_PRIORITY).evaluateAttributeExpressions(flowFile).asInteger();
+                    final Integer priorityInt = context.
+                            getProperty(MESSAGE_PRIORITY).
+                            evaluateAttributeExpressions(flowFile).
+                            asInteger();
                     priority = priorityInt == null ? priority : priorityInt;
                 } catch (final NumberFormatException e) {
-                    logger.warn("Invalid value for JMS Message Priority: {}; defaulting to priority of {}", new Object[]{
-                        context.getProperty(MESSAGE_PRIORITY).evaluateAttributeExpressions(flowFile).getValue(), DEFAULT_MESSAGE_PRIORITY});
+                    logger.
+                            warn("Invalid value for JMS Message Priority: {}; defaulting to priority of {}", new Object[]{
+                                context.getProperty(MESSAGE_PRIORITY).
+                                evaluateAttributeExpressions(flowFile).
+                                getValue(), DEFAULT_MESSAGE_PRIORITY});
                 }
 
                 try {
@@ -217,14 +242,16 @@ public class PutJMS extends AbstractProcessor {
                     }
                     producer.send(message);
                 } catch (final JMSException e) {
-                    logger.error("Failed to send {} to JMS Server due to {}", new Object[]{flowFile, e});
+                    logger.
+                            error("Failed to send {} to JMS Server due to {}", new Object[]{flowFile, e});
                     session.transfer(flowFiles, REL_FAILURE);
                     context.yield();
 
                     try {
                         jmsSession.rollback();
                     } catch (final JMSException jmse) {
-                        logger.warn("Unable to roll back JMS Session due to {}", new Object[]{jmse});
+                        logger.
+                                warn("Unable to roll back JMS Session due to {}", new Object[]{jmse});
                     }
 
                     wrappedProducer.close(logger);
@@ -232,17 +259,22 @@ public class PutJMS extends AbstractProcessor {
                 }
 
                 successfulFlowFiles.add(flowFile);
-                session.getProvenanceReporter().send(flowFile, "jms://" + context.getProperty(URL).getValue());
+                session.getProvenanceReporter().
+                        send(flowFile, "jms://" + context.getProperty(URL).
+                                getValue());
             }
 
             try {
                 jmsSession.commit();
 
                 session.transfer(successfulFlowFiles, REL_SUCCESS);
-                final String flowFileDescription = successfulFlowFiles.size() > 10 ? successfulFlowFiles.size() + " FlowFiles" : successfulFlowFiles.toString();
-                logger.info("Sent {} to JMS Server and transferred to 'success'", new Object[]{flowFileDescription});
+                final String flowFileDescription = successfulFlowFiles.size() > 10 ? successfulFlowFiles.
+                        size() + " FlowFiles" : successfulFlowFiles.toString();
+                logger.
+                        info("Sent {} to JMS Server and transferred to 'success'", new Object[]{flowFileDescription});
             } catch (JMSException e) {
-                logger.error("Failed to commit JMS Session due to {}; rolling back session", new Object[]{e});
+                logger.
+                        error("Failed to commit JMS Session due to {}; rolling back session", new Object[]{e});
                 session.rollback();
                 wrappedProducer.close(logger);
             }
@@ -257,19 +289,22 @@ public class PutJMS extends AbstractProcessor {
             final FlowFile flowFile, final Destination replyToQueue, final Integer priority) throws JMSException {
         final Message message;
 
-        switch (context.getProperty(MESSAGE_TYPE).getValue()) {
+        switch (context.getProperty(MESSAGE_TYPE).
+                getValue()) {
             case MSG_TYPE_EMPTY: {
                 message = jmsSession.createTextMessage("");
                 break;
             }
             case MSG_TYPE_STREAM: {
-                final StreamMessage streamMessage = jmsSession.createStreamMessage();
+                final StreamMessage streamMessage = jmsSession.
+                        createStreamMessage();
                 streamMessage.writeBytes(messageContent);
                 message = streamMessage;
                 break;
             }
             case MSG_TYPE_TEXT: {
-                message = jmsSession.createTextMessage(new String(messageContent, UTF8));
+                message = jmsSession.
+                        createTextMessage(new String(messageContent, UTF8));
                 break;
             }
             case MSG_TYPE_MAP: {
@@ -278,7 +313,8 @@ public class PutJMS extends AbstractProcessor {
             }
             case MSG_TYPE_BYTE:
             default: {
-                final BytesMessage bytesMessage = jmsSession.createBytesMessage();
+                final BytesMessage bytesMessage = jmsSession.
+                        createBytesMessage();
                 bytesMessage.writeBytes(messageContent);
                 message = bytesMessage;
             }
@@ -294,7 +330,8 @@ public class PutJMS extends AbstractProcessor {
             message.setJMSPriority(priority);
         }
 
-        if (context.getProperty(ATTRIBUTES_TO_JMS_PROPS).asBoolean()) {
+        if (context.getProperty(ATTRIBUTES_TO_JMS_PROPS).
+                asBoolean()) {
             copyAttributesToJmsProps(flowFile, message);
         }
 
@@ -332,7 +369,7 @@ public class PutJMS extends AbstractProcessor {
      * @param flowFile The flow file whose metadata should be examined for JMS
      * properties.
      * @param message The JMS message to which we want to add properties.
-     * @throws JMSException
+     * @throws JMSException ex
      */
     private void copyAttributesToJmsProps(final FlowFile flowFile, final Message message) throws JMSException {
         final ProcessorLog logger = getLogger();
@@ -342,38 +379,50 @@ public class PutJMS extends AbstractProcessor {
             final String key = entry.getKey();
             final String value = entry.getValue();
 
-            if (key.toLowerCase().startsWith(ATTRIBUTE_PREFIX.toLowerCase())
-                    && !key.toLowerCase().endsWith(ATTRIBUTE_TYPE_SUFFIX.toLowerCase())) {
+            if (key.toLowerCase().
+                    startsWith(ATTRIBUTE_PREFIX.toLowerCase())
+                    && !key.toLowerCase().
+                    endsWith(ATTRIBUTE_TYPE_SUFFIX.toLowerCase())) {
 
-                final String jmsPropName = key.substring(ATTRIBUTE_PREFIX.length());
+                final String jmsPropName = key.substring(ATTRIBUTE_PREFIX.
+                        length());
                 final String type = attributes.get(key + ATTRIBUTE_TYPE_SUFFIX);
 
                 try {
                     if (type == null || type.equalsIgnoreCase(PROP_TYPE_STRING)) {
                         message.setStringProperty(jmsPropName, value);
                     } else if (type.equalsIgnoreCase(PROP_TYPE_INTEGER)) {
-                        message.setIntProperty(jmsPropName, Integer.parseInt(value));
+                        message.setIntProperty(jmsPropName, Integer.
+                                parseInt(value));
                     } else if (type.equalsIgnoreCase(PROP_TYPE_BOOLEAN)) {
-                        message.setBooleanProperty(jmsPropName, Boolean.parseBoolean(value));
+                        message.setBooleanProperty(jmsPropName, Boolean.
+                                parseBoolean(value));
                     } else if (type.equalsIgnoreCase(PROP_TYPE_SHORT)) {
-                        message.setShortProperty(jmsPropName, Short.parseShort(value));
+                        message.setShortProperty(jmsPropName, Short.
+                                parseShort(value));
                     } else if (type.equalsIgnoreCase(PROP_TYPE_LONG)) {
-                        message.setLongProperty(jmsPropName, Long.parseLong(value));
+                        message.setLongProperty(jmsPropName, Long.
+                                parseLong(value));
                     } else if (type.equalsIgnoreCase(PROP_TYPE_BYTE)) {
-                        message.setByteProperty(jmsPropName, Byte.parseByte(value));
+                        message.setByteProperty(jmsPropName, Byte.
+                                parseByte(value));
                     } else if (type.equalsIgnoreCase(PROP_TYPE_DOUBLE)) {
-                        message.setDoubleProperty(jmsPropName, Double.parseDouble(value));
+                        message.setDoubleProperty(jmsPropName, Double.
+                                parseDouble(value));
                     } else if (type.equalsIgnoreCase(PROP_TYPE_FLOAT)) {
-                        message.setFloatProperty(jmsPropName, Float.parseFloat(value));
+                        message.setFloatProperty(jmsPropName, Float.
+                                parseFloat(value));
                     } else if (type.equalsIgnoreCase(PROP_TYPE_OBJECT)) {
                         message.setObjectProperty(jmsPropName, value);
                     } else {
-                        logger.warn("Attribute key '{}' for {} has value '{}', but expected one of: integer, string, object, byte, double, float, long, short, boolean; not adding this property",
-                                new Object[]{key, flowFile, value});
+                        logger.
+                                warn("Attribute key '{}' for {} has value '{}', but expected one of: integer, string, object, byte, double, float, long, short, boolean; not adding this property",
+                                        new Object[]{key, flowFile, value});
                     }
                 } catch (NumberFormatException e) {
-                    logger.warn("Attribute key '{}' for {} has value '{}', but attribute key '{}' has value '{}'. Not adding this JMS property",
-                            new Object[]{key, flowFile, value, key + ATTRIBUTE_TYPE_SUFFIX, PROP_TYPE_INTEGER});
+                    logger.
+                            warn("Attribute key '{}' for {} has value '{}', but attribute key '{}' has value '{}'. Not adding this JMS property",
+                                    new Object[]{key, flowFile, value, key + ATTRIBUTE_TYPE_SUFFIX, PROP_TYPE_INTEGER});
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
index 300878f..395ddee 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
@@ -34,7 +34,8 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
 @Tags({"remote", "copy", "egress", "put", "sftp", "archive", "files"})
 @CapabilityDescription("Sends FlowFiles to an SFTP Server")
 @SeeAlso(GetSFTP.class)
-@DynamicProperty(name="Disable Directory Listing", value="true or false", description="Disables directory listings before operations which might fail, such as configurations which create directory structures.")
+@DynamicProperty(name = "Disable Directory Listing", value = "true or false",
+        description = "Disables directory listings before operations which might fail, such as configurations which create directory structures.")
 public class PutSFTP extends PutFileTransfer<SFTPTransfer> {
 
     private List<PropertyDescriptor> properties;
@@ -75,10 +76,12 @@ public class PutSFTP extends PutFileTransfer<SFTPTransfer> {
 
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
-        if (SFTPTransfer.DISABLE_DIRECTORY_LISTING.getName().equalsIgnoreCase(propertyDescriptorName)) {
+        if (SFTPTransfer.DISABLE_DIRECTORY_LISTING.getName().
+                equalsIgnoreCase(propertyDescriptorName)) {
             return SFTPTransfer.DISABLE_DIRECTORY_LISTING;
         }
-        return super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
+        return super.
+                getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index 111dead..d061c33 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@ -67,47 +67,54 @@ public class ReplaceText extends AbstractProcessor {
     private final Pattern backReferencePattern = Pattern.compile("\\$(\\d+)");
     private static final byte[] ZERO_BYTE_BUFFER = new byte[0];
     // Properties
-    public static final PropertyDescriptor REGEX = new PropertyDescriptor.Builder()
-            .name("Regular Expression")
-            .description("The Regular Expression to search for in the FlowFile content")
-            .required(true)
-            .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
-            .expressionLanguageSupported(true)
-            .defaultValue("(.*)")
-            .build();
-    public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder()
-            .name("Replacement Value")
-            .description("The value to replace the regular expression with. Back-references to Regular Expression capturing groups are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.")
-            .required(true)
-            .defaultValue("$1")
-            .addValidator(Validator.VALID)
-            .expressionLanguageSupported(true)
-            .build();
-    public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
-            .name("Character Set")
-            .description("The Character Set in which the file is encoded")
-            .required(true)
-            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-            .defaultValue("UTF-8")
-            .build();
-    public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
-            .name("Maximum Buffer Size")
-            .description("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. "
-                    + "In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'")
-            .required(true)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("1 MB")
-            .build();
-    public static final PropertyDescriptor EVALUATION_MODE = new PropertyDescriptor.Builder()
-            .name("Evaluation Mode")
-            .description("Evaluate the 'Regular Expression' against each line (Line-by-Line) or buffer the entire file into memory (Entire Text) and then evaluate the 'Regular Expression'.")
-            .allowableValues(LINE_BY_LINE, ENTIRE_TEXT)
-            .defaultValue(ENTIRE_TEXT)
-            .required(true)
-            .build();
+    public static final PropertyDescriptor REGEX = new PropertyDescriptor.Builder().
+            name("Regular Expression").
+            description("The Regular Expression to search for in the FlowFile content").
+            required(true).
+            addValidator(StandardValidators.
+                    createRegexValidator(0, Integer.MAX_VALUE, true)).
+            expressionLanguageSupported(true).
+            defaultValue("(.*)").
+            build();
+    public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder().
+            name("Replacement Value").
+            description("The value to replace the regular expression with. Back-references to Regular Expression capturing groups are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.").
+            required(true).
+            defaultValue("$1").
+            addValidator(Validator.VALID).
+            expressionLanguageSupported(true).
+            build();
+    public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder().
+            name("Character Set").
+            description("The Character Set in which the file is encoded").
+            required(true).
+            addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).
+            defaultValue("UTF-8").
+            build();
+    public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder().
+            name("Maximum Buffer Size").
+            description("Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. "
+                    + "In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'").
+            required(true).
+            addValidator(StandardValidators.DATA_SIZE_VALIDATOR).
+            defaultValue("1 MB").
+            build();
+    public static final PropertyDescriptor EVALUATION_MODE = new PropertyDescriptor.Builder().
+            name("Evaluation Mode").
+            description("Evaluate the 'Regular Expression' against each line (Line-by-Line) or buffer the entire file into memory (Entire Text) and then evaluate the 'Regular Expression'.").
+            allowableValues(LINE_BY_LINE, ENTIRE_TEXT).
+            defaultValue(ENTIRE_TEXT).
+            required(true).
+            build();
     // Relationships
-    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression").build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that could not be updated are routed to this relationship").build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().
+            name("success").
+            description("FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression").
+            build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().
+            name("failure").
+            description("FlowFiles that could not be updated are routed to this relationship").
+            build();
     //
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
@@ -140,15 +147,19 @@ public class ReplaceText extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 100));
+        final List<FlowFile> flowFiles = session.get(FlowFileFilters.
+                newSizeBasedFilter(1, DataUnit.MB, 100));
         if (flowFiles.isEmpty()) {
             return;
         }
 
         final ProcessorLog logger = getLogger();
-        final String unsubstitutedRegex = context.getProperty(REGEX).getValue();
-        String unsubstitutedReplacement = context.getProperty(REPLACEMENT_VALUE).getValue();
-        if (unsubstitutedRegex.equals("(.*)") && unsubstitutedReplacement.equals("$1")) {
+        final String unsubstitutedRegex = context.getProperty(REGEX).
+                getValue();
+        String unsubstitutedReplacement = context.getProperty(REPLACEMENT_VALUE).
+                getValue();
+        if (unsubstitutedRegex.equals("(.*)") && unsubstitutedReplacement.
+                equals("$1")) {
             // This pattern says replace content with itself. We can highly optimize this process by simply transferring
             // all FlowFiles to the 'success' relationship
             session.transfer(flowFiles, REL_SUCCESS);
@@ -169,17 +180,26 @@ public class ReplaceText extends AbstractProcessor {
             }
         };
 
-        final String regexValue = context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
-        final int numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
+        final String regexValue = context.getProperty(REGEX).
+                evaluateAttributeExpressions().
+                getValue();
+        final int numCapturingGroups = Pattern.compile(regexValue).
+                matcher("").
+                groupCount();
 
         final boolean skipBuffer = ".*".equals(unsubstitutedRegex);
 
-        final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
-        final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final Charset charset = Charset.forName(context.
+                getProperty(CHARACTER_SET).
+                getValue());
+        final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).
+                asDataSize(DataUnit.B).
+                intValue();
 
         final byte[] buffer = skipBuffer ? ZERO_BYTE_BUFFER : new byte[maxBufferSize];
 
-        final String evaluateMode = context.getProperty(EVALUATION_MODE).getValue();
+        final String evaluateMode = context.getProperty(EVALUATION_MODE).
+                getValue();
 
         for (FlowFile flowFile : flowFiles) {
             if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
@@ -189,8 +209,11 @@ public class ReplaceText extends AbstractProcessor {
                 }
             }
 
-            String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, escapeBackRefDecorator).getValue();
-            final Matcher backRefMatcher = backReferencePattern.matcher(replacement);
+            String replacement = context.getProperty(REPLACEMENT_VALUE).
+                    evaluateAttributeExpressions(flowFile, escapeBackRefDecorator).
+                    getValue();
+            final Matcher backRefMatcher = backReferencePattern.
+                    matcher(replacement);
             while (backRefMatcher.find()) {
                 final String backRefNum = backRefMatcher.group(1);
                 if (backRefNum.startsWith("0")) {
@@ -208,7 +231,8 @@ public class ReplaceText extends AbstractProcessor {
                 }
 
                 if (backRefIndex > numCapturingGroups) {
-                    final StringBuilder sb = new StringBuilder(replacement.length() + 1);
+                    final StringBuilder sb = new StringBuilder(replacement.
+                            length() + 1);
                     final int groupStart = backRefMatcher.start(1);
 
                     sb.append(replacement.substring(0, groupStart - 1));
@@ -226,12 +250,14 @@ public class ReplaceText extends AbstractProcessor {
             if (skipBuffer) {
                 final StopWatch stopWatch = new StopWatch(true);
                 if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
-                    flowFile = session.write(flowFile, new OutputStreamCallback() {
-                        @Override
-                        public void process(final OutputStream out) throws IOException {
-                            out.write(replacementValue.getBytes(charset));
-                        }
-                    });
+                    flowFile = session.
+                            write(flowFile, new OutputStreamCallback() {
+                                @Override
+                                public void process(final OutputStream out) throws IOException {
+                                    out.
+                                    write(replacementValue.getBytes(charset));
+                                }
+                            });
                 } else {
                     flowFile = session.write(flowFile, new StreamCallback() {
                         @Override
@@ -245,14 +271,19 @@ public class ReplaceText extends AbstractProcessor {
                         }
                     });
                 }
-                session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                session.getProvenanceReporter().
+                        modifyContent(flowFile, stopWatch.
+                                getElapsed(TimeUnit.MILLISECONDS));
                 session.transfer(flowFile, REL_SUCCESS);
-                logger.info("Transferred {} to 'success'", new Object[]{flowFile});
+                logger.
+                        info("Transferred {} to 'success'", new Object[]{flowFile});
                 continue;
             }
 
             final StopWatch stopWatch = new StopWatch(true);
-            final String regex = context.getProperty(REGEX).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
+            final String regex = context.getProperty(REGEX).
+                    evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).
+                    getValue();
 
             if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
                 final int flowFileSize = (int) flowFile.getSize();
@@ -261,7 +292,8 @@ public class ReplaceText extends AbstractProcessor {
                     public void process(final InputStream in, final OutputStream out) throws IOException {
                         StreamUtils.fillBuffer(in, buffer, false);
                         final String contentString = new String(buffer, 0, flowFileSize, charset);
-                        final String updatedValue = contentString.replaceAll(regex, replacementValue);
+                        final String updatedValue = contentString.
+                                replaceAll(regex, replacementValue);
                         out.write(updatedValue.getBytes(charset));
                     }
                 });
@@ -273,7 +305,8 @@ public class ReplaceText extends AbstractProcessor {
                                 BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
                             String oneLine;
                             while (null != (oneLine = br.readLine())) {
-                                final String updatedValue = oneLine.replaceAll(regex, replacementValue);
+                                final String updatedValue = oneLine.
+                                        replaceAll(regex, replacementValue);
                                 bw.write(updatedValue);
                             }
                         }
@@ -282,7 +315,9 @@ public class ReplaceText extends AbstractProcessor {
             }
 
             logger.info("Transferred {} to 'success'", new Object[]{flowFile});
-            session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.getProvenanceReporter().
+                    modifyContent(flowFile, stopWatch.
+                            getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, REL_SUCCESS);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
index c4dd83a..a8a2919 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
@@ -71,52 +71,62 @@ import org.apache.commons.lang3.StringUtils;
 @CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression against it and replacing the section of the content that matches the Regular Expression with some alternate value provided in a mapping file.")
 public class ReplaceTextWithMapping extends AbstractProcessor {
 
-    public static final PropertyDescriptor REGEX = new PropertyDescriptor.Builder()
-            .name("Regular Expression")
-            .description("The Regular Expression to search for in the FlowFile content")
-            .required(true)
-            .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
-            .expressionLanguageSupported(true)
-            .defaultValue("\\S+")
-            .build();
-    public static final PropertyDescriptor MATCHING_GROUP_FOR_LOOKUP_KEY = new PropertyDescriptor.Builder()
-            .name("Matching Group")
-            .description("The number of the matching group of the provided regex to replace with the corresponding value from the mapping file (if it exists).")
-            .addValidator(StandardValidators.INTEGER_VALIDATOR)
-            .required(true)
-            .expressionLanguageSupported(true)
-            .defaultValue("0").build();
-    public static final PropertyDescriptor MAPPING_FILE = new PropertyDescriptor.Builder()
-            .name("Mapping File")
-            .description("The name of the file (including the full path) containing the Mappings.")
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .required(true)
-            .build();
-    public static final PropertyDescriptor MAPPING_FILE_REFRESH_INTERVAL = new PropertyDescriptor.Builder()
-            .name("Mapping File Refresh Interval")
-            .description("The polling interval in seconds to check for updates to the mapping file. The default is 60s.")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .required(true)
-            .defaultValue("60s").build();
-    public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
-            .name("Character Set")
-            .description("The Character Set in which the file is encoded")
-            .required(true)
-            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-            .defaultValue("UTF-8")
-            .build();
-    public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
-            .name("Maximum Buffer Size")
-            .description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. If a FlowFile is larger than this value, the FlowFile will be routed to 'failure'")
-            .required(true)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("1 MB")
-            .build();
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression").build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that could not be updated are routed to this relationship").build();
-
-    private final Pattern backReferencePattern = Pattern.compile("[^\\\\]\\$(\\d+)");
+    public static final PropertyDescriptor REGEX = new PropertyDescriptor.Builder().
+            name("Regular Expression").
+            description("The Regular Expression to search for in the FlowFile content").
+            required(true).
+            addValidator(StandardValidators.
+                    createRegexValidator(0, Integer.MAX_VALUE, true)).
+            expressionLanguageSupported(true).
+            defaultValue("\\S+").
+            build();
+    public static final PropertyDescriptor MATCHING_GROUP_FOR_LOOKUP_KEY = new PropertyDescriptor.Builder().
+            name("Matching Group").
+            description("The number of the matching group of the provided regex to replace with the corresponding value from the mapping file (if it exists).").
+            addValidator(StandardValidators.INTEGER_VALIDATOR).
+            required(true).
+            expressionLanguageSupported(true).
+            defaultValue("0").
+            build();
+    public static final PropertyDescriptor MAPPING_FILE = new PropertyDescriptor.Builder().
+            name("Mapping File").
+            description("The name of the file (including the full path) containing the Mappings.").
+            addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).
+            required(true).
+            build();
+    public static final PropertyDescriptor MAPPING_FILE_REFRESH_INTERVAL = new PropertyDescriptor.Builder().
+            name("Mapping File Refresh Interval").
+            description("The polling interval in seconds to check for updates to the mapping file. The default is 60s.").
+            addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).
+            required(true).
+            defaultValue("60s").
+            build();
+    public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder().
+            name("Character Set").
+            description("The Character Set in which the file is encoded").
+            required(true).
+            addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).
+            defaultValue("UTF-8").
+            build();
+    public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder().
+            name("Maximum Buffer Size").
+            description("Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. If a FlowFile is larger than this value, the FlowFile will be routed to 'failure'").
+            required(true).
+            addValidator(StandardValidators.DATA_SIZE_VALIDATOR).
+            defaultValue("1 MB").
+            build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().
+            name("success").
+            description("FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression").
+            build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder().
+            name("failure").
+            description("FlowFiles that could not be updated are routed to this relationship").
+            build();
+
+    private final Pattern backReferencePattern = Pattern.
+            compile("[^\\\\]\\$(\\d+)");
 
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
@@ -129,14 +139,26 @@ public class ReplaceTextWithMapping extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext context) {
-        final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
-
-        final String regexValue = context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
-        final int numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
-        final int groupToMatch = context.getProperty(MATCHING_GROUP_FOR_LOOKUP_KEY).evaluateAttributeExpressions().asInteger();
+        final List<ValidationResult> errors = new ArrayList<>(super.
+                customValidate(context));
+
+        final String regexValue = context.getProperty(REGEX).
+                evaluateAttributeExpressions().
+                getValue();
+        final int numCapturingGroups = Pattern.compile(regexValue).
+                matcher("").
+                groupCount();
+        final int groupToMatch = context.
+                getProperty(MATCHING_GROUP_FOR_LOOKUP_KEY).
+                evaluateAttributeExpressions().
+                asInteger();
 
         if (groupToMatch > numCapturingGroups) {
-            errors.add(new ValidationResult.Builder().subject("Insufficient Matching Groups").valid(false).explanation("The specified matching group does not exist for the regular expression provided").build());
+            errors.add(new ValidationResult.Builder().
+                    subject("Insufficient Matching Groups").
+                    valid(false).
+                    explanation("The specified matching group does not exist for the regular expression provided").
+                    build());
         }
         return errors;
     }
@@ -178,7 +200,9 @@ public class ReplaceTextWithMapping extends AbstractProcessor {
 
         final ProcessorLog logger = getLogger();
 
-        final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).
+                asDataSize(DataUnit.B).
+                intValue();
 
         for (FlowFile flowFile : flowFiles) {
             if (flowFile.getSize() > maxBufferSize) {
@@ -188,10 +212,13 @@ public class ReplaceTextWithMapping extends AbstractProcessor {
 
             final StopWatch stopWatch = new StopWatch(true);
 
-            flowFile = session.write(flowFile, new ReplaceTextCallback(context, flowFile, maxBufferSize));
+            flowFile = session.
+                    write(flowFile, new ReplaceTextCallback(context, flowFile, maxBufferSize));
 
             logger.info("Transferred {} to 'success'", new Object[]{flowFile});
-            session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.getProvenanceReporter().
+                    modifyContent(flowFile, stopWatch.
+                            getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, REL_SUCCESS);
         }
     }
@@ -225,34 +252,42 @@ public class ReplaceTextWithMapping extends AbstractProcessor {
                 // if not queried mapping file lastUpdate time in
                 // mapppingRefreshPeriodSecs, do so.
                 long currentTimeSecs = System.currentTimeMillis() / 1000;
-                long mappingRefreshPeriodSecs = context.getProperty(MAPPING_FILE_REFRESH_INTERVAL).asTimePeriod(TimeUnit.SECONDS);
+                long mappingRefreshPeriodSecs = context.
+                        getProperty(MAPPING_FILE_REFRESH_INTERVAL).
+                        asTimePeriod(TimeUnit.SECONDS);
 
                 boolean retry = (currentTimeSecs > (mappingTestTime.get() + mappingRefreshPeriodSecs));
                 if (retry) {
                     mappingTestTime.set(System.currentTimeMillis() / 1000);
                     // see if the mapping file needs to be reloaded
-                    final String fileName = context.getProperty(MAPPING_FILE).getValue();
+                    final String fileName = context.getProperty(MAPPING_FILE).
+                            getValue();
                     final File file = new File(fileName);
                     if (file.exists() && file.isFile() && file.canRead()) {
                         if (file.lastModified() > lastModified.get()) {
                             lastModified.getAndSet(file.lastModified());
                             try (FileInputStream is = new FileInputStream(file)) {
-                                logger.info("Reloading mapping file: {}", new Object[]{fileName});
+                                logger.
+                                        info("Reloading mapping file: {}", new Object[]{fileName});
 
                                 final Map<String, String> mapping = loadMappingFile(is);
                                 final ConfigurationState newState = new ConfigurationState(
                                         mapping);
                                 configurationStateRef.set(newState);
                             } catch (IOException e) {
-                                logger.error("Error reading mapping file: {}", new Object[]{e.getMessage()});
+                                logger.
+                                        error("Error reading mapping file: {}", new Object[]{e.
+                                            getMessage()});
                             }
                         }
                     } else {
-                        logger.error("Mapping file does not exist or is not readable: {}", new Object[]{fileName});
+                        logger.
+                                error("Mapping file does not exist or is not readable: {}", new Object[]{fileName});
                     }
                 }
             } catch (Exception e) {
-                logger.error("Error loading mapping file: {}", new Object[]{e.getMessage()});
+                logger.error("Error loading mapping file: {}", new Object[]{e.
+                    getMessage()});
             } finally {
                 processorLock.unlock();
             }
@@ -263,7 +298,7 @@ public class ReplaceTextWithMapping extends AbstractProcessor {
      * Loads a file containing mappings.
      *
      * @param is
-     * @return 
+     * @return
      * @throws IOException
      */
     protected Map<String, String> loadMappingFile(InputStream is) throws IOException {
@@ -319,24 +354,34 @@ public class ReplaceTextWithMapping extends AbstractProcessor {
         };
 
         private ReplaceTextCallback(ProcessContext context, FlowFile flowFile, int maxBufferSize) {
-            this.regex = context.getProperty(REGEX).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
+            this.regex = context.getProperty(REGEX).
+                    evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).
+                    getValue();
             this.flowFile = flowFile;
 
-            this.charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+            this.charset = Charset.forName(context.getProperty(CHARACTER_SET).
+                    getValue());
 
-            final String regexValue = context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
-            this.numCapturingGroups = Pattern.compile(regexValue).matcher("").groupCount();
+            final String regexValue = context.getProperty(REGEX).
+                    evaluateAttributeExpressions().
+                    getValue();
+            this.numCapturingGroups = Pattern.compile(regexValue).
+                    matcher("").
+                    groupCount();
 
             this.buffer = new byte[maxBufferSize];
 
-            this.groupToMatch = context.getProperty(MATCHING_GROUP_FOR_LOOKUP_KEY).evaluateAttributeExpressions().asInteger();
+            this.groupToMatch = context.
+                    getProperty(MATCHING_GROUP_FOR_LOOKUP_KEY).
+                    evaluateAttributeExpressions().
+                    asInteger();
         }
 
         @Override
         public void process(final InputStream in, final OutputStream out) throws IOException {
 
-            final Map<String, String> mapping = configurationStateRef.get()
-                    .getMapping();
+            final Map<String, String> mapping = configurationStateRef.get().
+                    getMapping();
 
             StreamUtils.fillBuffer(in, buffer, false);
 
@@ -344,7 +389,8 @@ public class ReplaceTextWithMapping extends AbstractProcessor {
 
             final String contentString = new String(buffer, 0, flowFileSize, charset);
 
-            final Matcher matcher = Pattern.compile(regex).matcher(contentString);
+            final Matcher matcher = Pattern.compile(regex).
+                    matcher(contentString);
 
             matcher.reset();
             boolean result = matcher.find();
@@ -355,26 +401,37 @@ public class ReplaceTextWithMapping extends AbstractProcessor {
                     String rv = mapping.get(matched);
 
                     if (rv == null) {
-                        String replacement = matcher.group().replace("$", "\\$");
+                        String replacement = matcher.group().
+                                replace("$", "\\$");
                         matcher.appendReplacement(sb, replacement);
                     } else {
                         String allRegexMatched = matcher.group(); //this is everything that matched the regex
 
-                        int scaledStart = matcher.start(groupToMatch) - matcher.start();
-                        int scaledEnd = scaledStart + matcher.group(groupToMatch).length();
+                        int scaledStart = matcher.start(groupToMatch) - matcher.
+                                start();
+                        int scaledEnd = scaledStart + matcher.
+                                group(groupToMatch).
+                                length();
 
                         StringBuilder replacementBuilder = new StringBuilder();
 
-                        replacementBuilder.append(allRegexMatched.substring(0, scaledStart).replace("$", "\\$"));
-                        replacementBuilder.append(fillReplacementValueBackReferences(rv, numCapturingGroups));
-                        replacementBuilder.append(allRegexMatched.substring(scaledEnd).replace("$", "\\$"));
-
-                        matcher.appendReplacement(sb, replacementBuilder.toString());
+                        replacementBuilder.append(allRegexMatched.
+                                substring(0, scaledStart).
+                                replace("$", "\\$"));
+                        replacementBuilder.
+                                append(fillReplacementValueBackReferences(rv, numCapturingGroups));
+                        replacementBuilder.append(allRegexMatched.
+                                substring(scaledEnd).
+                                replace("$", "\\$"));
+
+                        matcher.appendReplacement(sb, replacementBuilder.
+                                toString());
                     }
                     result = matcher.find();
                 } while (result);
                 matcher.appendTail(sb);
-                out.write(sb.toString().getBytes(charset));
+                out.write(sb.toString().
+                        getBytes(charset));
                 return;
             }
             out.write(contentString.getBytes(charset));

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
index 374e6ec..ff231d7 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteOnAttribute.java
@@ -67,9 +67,9 @@ import org.apache.nifi.processor.util.StandardValidators;
 @SupportsBatching
 @Tags({"attributes", "routing", "Attribute Expression Language", "regexp", "regex", "Regular Expression", "Expression Language"})
 @CapabilityDescription("Routes FlowFiles based on their Attributes using the Attribute Expression Language")
-@DynamicProperty(name="Relationship Name", value="Attribute Expression Language", supportsExpressionLanguage=true, description="Routes FlowFiles whose " + 
-"attributes match the Attribute Expression Language specified in the Dynamic Property Value to the Relationship specified in the Dynamic Property Key")
-@DynamicRelationship(name="Name from Dynamic Property", description="FlowFiles that match the Dynamic Property's Attribute Expression Language")
+@DynamicProperty(name = "Relationship Name", value = "Attribute Expression Language", supportsExpressionLanguage = true, description = "Routes FlowFiles whose "
+        + "attributes match the Attribute Expression Language specified in the Dynamic Property Value to the Relationship specified in the Dynamic Property Key")
+@DynamicRelationship(name = "Name from Dynamic Property", description = "FlowFiles that match the Dynamic Property's Attribute Expression Language")
 public class RouteOnAttribute extends AbstractProcessor {
 
     public static final String ROUTE_ATTRIBUTE_KEY = "RouteOnAttribute.Route";
@@ -95,26 +95,27 @@ public class RouteOnAttribute extends AbstractProcessor {
             "Requires that at least one user-defined expression evaluate to 'true' for hte FlowFile to be considered a match"
     );
 
-    public static final PropertyDescriptor ROUTE_STRATEGY = new PropertyDescriptor.Builder()
-            .name("Routing Strategy")
-            .description("Specifies how to determine which relationship to use when evaluating the Expression Language")
-            .required(true)
-            .allowableValues(ROUTE_PROPERTY_NAME, ROUTE_ALL_MATCH, ROUTE_ANY_MATCHES)
-            .defaultValue(ROUTE_PROPERTY_NAME.getValue())
-            .build();
+    public static final PropertyDescriptor ROUTE_STRATEGY = new PropertyDescriptor.Builder().
+            name("Routing Strategy").
+            description("Specifies how to determine which relationship to use when evaluating the Expression Language").
+            required(true).
+            allowableValues(ROUTE_PROPERTY_NAME, ROUTE_ALL_MATCH, ROUTE_ANY_MATCHES).
+            defaultValue(ROUTE_PROPERTY_NAME.getValue()).
+            build();
 
     public static final Relationship REL_NO_MATCH = new Relationship.Builder()
-            .name("unmatched")
-            .description("FlowFiles that do not match any user-define expression will be routed here")
-            .build();
+            .name("unmatched").
+            description("FlowFiles that do not match any user-define expression will be routed here").
+            build();
     public static final Relationship REL_MATCH = new Relationship.Builder()
-            .name("matched")
-            .description("FlowFiles will be routed to 'match' if one or all Expressions match, depending on the configuration of the Routing Strategy property")
-            .build();
+            .name("matched").
+            description("FlowFiles will be routed to 'match' if one or all Expressions match, depending on the configuration of the Routing Strategy property").
+            build();
 
     private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
     private List<PropertyDescriptor> properties;
-    private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue();
+    private volatile String configuredRouteStrategy = ROUTE_STRATEGY.
+            getDefaultValue();
     private volatile Set<String> dynamicPropertyNames = new HashSet<>();
 
     @Override
@@ -141,12 +142,13 @@ public class RouteOnAttribute extends AbstractProcessor {
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
-                .required(false)
-                .name(propertyDescriptorName)
-                .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.BOOLEAN, false))
-                .dynamic(true)
-                .expressionLanguageSupported(true)
-                .build();
+                .required(false).
+                name(propertyDescriptorName).
+                addValidator(StandardValidators.
+                        createAttributeExpressionLanguageValidator(ResultType.BOOLEAN, false)).
+                dynamic(true).
+                expressionLanguageSupported(true).
+                build();
     }
 
     @Override
@@ -161,7 +163,8 @@ public class RouteOnAttribute extends AbstractProcessor {
                 newDynamicPropertyNames.add(descriptor.getName());
             }
 
-            this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
+            this.dynamicPropertyNames = Collections.
+                    unmodifiableSet(newDynamicPropertyNames);
         }
 
         // formulate the new set of Relationships
@@ -170,7 +173,8 @@ public class RouteOnAttribute extends AbstractProcessor {
         final String routeStrategy = configuredRouteStrategy;
         if (ROUTE_PROPERTY_NAME.equals(routeStrategy)) {
             for (final String propName : allDynamicProps) {
-                newRelationships.add(new Relationship.Builder().name(propName).build());
+                newRelationships.add(new Relationship.Builder().name(propName).
+                        build());
             }
         } else {
             newRelationships.add(REL_MATCH);
@@ -189,26 +193,32 @@ public class RouteOnAttribute extends AbstractProcessor {
 
         final ProcessorLog logger = getLogger();
         final Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
-        for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+        for (final PropertyDescriptor descriptor : context.getProperties().
+                keySet()) {
             if (!descriptor.isDynamic()) {
                 continue;
             }
 
-            propertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
+            propertyMap.put(new Relationship.Builder().
+                    name(descriptor.getName()).
+                    build(), context.getProperty(descriptor));
         }
 
         final Set<Relationship> matchingRelationships = new HashSet<>();
-        for (final Map.Entry<Relationship, PropertyValue> entry : propertyMap.entrySet()) {
+        for (final Map.Entry<Relationship, PropertyValue> entry : propertyMap.
+                entrySet()) {
             final PropertyValue value = entry.getValue();
 
-            final boolean matches = value.evaluateAttributeExpressions(flowFile).asBoolean();
+            final boolean matches = value.evaluateAttributeExpressions(flowFile).
+                    asBoolean();
             if (matches) {
                 matchingRelationships.add(entry.getKey());
             }
         }
 
         final Set<Relationship> destinationRelationships = new HashSet<>();
-        switch (context.getProperty(ROUTE_STRATEGY).getValue()) {
+        switch (context.getProperty(ROUTE_STRATEGY).
+                getValue()) {
             case routeAllMatchValue:
                 if (matchingRelationships.size() == propertyMap.size()) {
                     destinationRelationships.add(REL_MATCH);
@@ -231,35 +241,52 @@ public class RouteOnAttribute extends AbstractProcessor {
 
         if (destinationRelationships.isEmpty()) {
             logger.info(this + " routing " + flowFile + " to unmatched");
-            flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.getName());
-            session.getProvenanceReporter().route(flowFile, REL_NO_MATCH);
+            flowFile = session.
+                    putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_NO_MATCH.
+                            getName());
+            session.getProvenanceReporter().
+                    route(flowFile, REL_NO_MATCH);
             session.transfer(flowFile, REL_NO_MATCH);
         } else {
-            final Iterator<Relationship> relationshipNameIterator = destinationRelationships.iterator();
-            final Relationship firstRelationship = relationshipNameIterator.next();
+            final Iterator<Relationship> relationshipNameIterator = destinationRelationships.
+                    iterator();
+            final Relationship firstRelationship = relationshipNameIterator.
+                    next();
             final Map<Relationship, FlowFile> transferMap = new HashMap<>();
             final Set<FlowFile> clones = new HashSet<>();
 
             // make all the clones for any remaining relationships
             while (relationshipNameIterator.hasNext()) {
-                final Relationship relationship = relationshipNameIterator.next();
+                final Relationship relationship = relationshipNameIterator.
+                        next();
                 final FlowFile cloneFlowFile = session.clone(flowFile);
                 clones.add(cloneFlowFile);
                 transferMap.put(relationship, cloneFlowFile);
             }
 
             // now transfer any clones generated
-            for (final Map.Entry<Relationship, FlowFile> entry : transferMap.entrySet()) {
-                logger.info(this + " cloned " + flowFile + " into " + entry.getValue() + " and routing clone to relationship " + entry.getKey());
-                FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.getKey().getName());
-                session.getProvenanceReporter().route(updatedFlowFile, entry.getKey());
+            for (final Map.Entry<Relationship, FlowFile> entry : transferMap.
+                    entrySet()) {
+                logger.info(this + " cloned " + flowFile + " into " + entry.
+                        getValue() + " and routing clone to relationship " + entry.
+                        getKey());
+                FlowFile updatedFlowFile = session.
+                        putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.
+                                getKey().
+                                getName());
+                session.getProvenanceReporter().
+                        route(updatedFlowFile, entry.getKey());
                 session.transfer(updatedFlowFile, entry.getKey());
             }
 
             //now transfer the original flow file
-            logger.info("Routing {} to {}", new Object[]{flowFile, firstRelationship});
-            session.getProvenanceReporter().route(flowFile, firstRelationship);
-            flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, firstRelationship.getName());
+            logger.
+                    info("Routing {} to {}", new Object[]{flowFile, firstRelationship});
+            session.getProvenanceReporter().
+                    route(flowFile, firstRelationship);
+            flowFile = session.
+                    putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, firstRelationship.
+                            getName());
             session.transfer(flowFile, firstRelationship);
         }
     }