You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/27 13:45:24 UTC

[13/16] incubator-nifi git commit: NIFI-271 chipping away - more work left in standard bundle

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index c8345d2..8f4286b 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -27,7 +27,6 @@ import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.Charset;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -72,13 +71,14 @@ import org.joda.time.format.DateTimeFormatter;
 @SupportsBatching
 @Tags({"http", "https", "rest", "client"})
 @CapabilityDescription("An HTTP client processor which converts FlowFile attributes to HTTP headers, with configurable HTTP method, url, etc.")
-@WritesAttributes({ @WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"),
-        @WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"),
-        @WritesAttribute(attribute = "invokehttp.response.body", description = "The response body"),
-        @WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"),
-        @WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
-        @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server") })
-@DynamicProperty(name="Trusted Hostname", value="A hostname", description="Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
+@WritesAttributes({
+    @WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"),
+    @WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"),
+    @WritesAttribute(attribute = "invokehttp.response.body", description = "The response body"),
+    @WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"),
+    @WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
+    @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server")})
+@DynamicProperty(name = "Trusted Hostname", value = "A hostname", description = "Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
         + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
 public final class InvokeHTTP extends AbstractProcessor {
 
@@ -88,7 +88,7 @@ public final class InvokeHTTP extends AbstractProcessor {
 
         return Config.PROPERTIES;
     }
-    
+
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
         if (Config.PROP_TRUSTED_HOSTNAME.getName().equalsIgnoreCase(propertyDescriptorName)) {
@@ -118,7 +118,8 @@ public final class InvokeHTTP extends AbstractProcessor {
             } else {
                 SSLContextService svc = (SSLContextService) getControllerServiceLookup().getControllerService(newValue);
                 sslContextRef.set(svc.createSSLContext(ClientAuth.NONE));  // ClientAuth is only useful for servers, not clients.
-                getLogger().info("Loading SSL configuration from keystore={} and truststore={}", new Object[]{svc.getKeyStoreFile(), svc.getTrustStoreFile()});
+                getLogger().info("Loading SSL configuration from keystore={} and truststore={}",
+                        new Object[]{svc.getKeyStoreFile(), svc.getTrustStoreFile()});
             }
         }
 
@@ -143,17 +144,14 @@ public final class InvokeHTTP extends AbstractProcessor {
         }
 
         for (FlowFile flowfile : flowfiles) {
-            Transaction transaction = new Transaction(
-                    getLogger(), sslContextRef, attributesToSendRef, context, session, flowfile
-            );
+            Transaction transaction = new Transaction(getLogger(), sslContextRef, attributesToSendRef, context, session, flowfile);
             transaction.process();
         }
     }
 
     /**
      *
-     * Stores properties, relationships, configuration values, hard coded
-     * strings, magic numbers, etc.
+     * Stores properties, relationships, configuration values, hard coded strings, magic numbers, etc.
      *
      *
      */
@@ -174,13 +172,13 @@ public final class InvokeHTTP extends AbstractProcessor {
         // processing, including when converting http headers, copying attributes, etc.
         // This set includes our strings defined above as well as some standard flowfile
         // attributes.
-        Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+        public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
                 STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN,
                 "uuid", "filename", "path"
         )));
 
         //-- properties --//
-        PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
+        public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
                 .name("HTTP Method")
                 .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).")
                 .required(true)
@@ -189,7 +187,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
                 .build();
 
-        PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
+        public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
                 .name("Remote URL")
                 .description("Remote URL which will be connected to, including scheme, host, port, path.")
                 .required(true)
@@ -197,7 +195,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                 .addValidator(StandardValidators.URL_VALIDATOR)
                 .build();
 
-        PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
+        public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
                 .name("Connection Timeout")
                 .description("Max wait time for connection to remote service.")
                 .required(true)
@@ -205,7 +203,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                 .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
                 .build();
 
-        PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder()
+        public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder()
                 .name("Read Timeout")
                 .description("Max wait time for response from remote service.")
                 .required(true)
@@ -213,7 +211,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                 .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
                 .build();
 
-        PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder()
+        public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder()
                 .name("Include Date Header")
                 .description("Include an RFC-2616 Date header in the request.")
                 .required(true)
@@ -222,7 +220,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                 .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
                 .build();
 
-        PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder()
+        public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder()
                 .name("Follow Redirects")
                 .description("Follow HTTP redirects issued by remote server.")
                 .required(true)
@@ -231,7 +229,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                 .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
                 .build();
 
-        PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder()
+        public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder()
                 .name("Attributes to Send")
                 .description("Regular expression that defines which attributes to send as HTTP headers in the request. "
                         + "If not defined, no attributes are sent as headers.")
@@ -239,27 +237,28 @@ public final class InvokeHTTP extends AbstractProcessor {
                 .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
                 .build();
 
-        PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+        public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
                 .name("SSL Context Service")
                 .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
                 .required(false)
                 .identifiesControllerService(SSLContextService.class)
                 .build();
-        
-        List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
-                PROP_METHOD,
-                PROP_URL,
-                PROP_SSL_CONTEXT_SERVICE,
-                PROP_CONNECT_TIMEOUT,
-                PROP_READ_TIMEOUT,
-                PROP_DATE_HEADER,
-                PROP_FOLLOW_REDIRECTS,
-                PROP_ATTRIBUTES_TO_SEND
-        ));
+
+        public static final List<PropertyDescriptor> PROPERTIES = Collections.
+                unmodifiableList(Arrays.asList(
+                                PROP_METHOD,
+                                PROP_URL,
+                                PROP_SSL_CONTEXT_SERVICE,
+                                PROP_CONNECT_TIMEOUT,
+                                PROP_READ_TIMEOUT,
+                                PROP_DATE_HEADER,
+                                PROP_FOLLOW_REDIRECTS,
+                                PROP_ATTRIBUTES_TO_SEND
+                        ));
 
         // property to allow the hostname verifier to be overridden
         // this is a "hidden" property - it's configured using a dynamic user property
-        PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder()
+        public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder()
                 .name("Trusted Hostname")
                 .description("Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
                         + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
@@ -268,60 +267,53 @@ public final class InvokeHTTP extends AbstractProcessor {
                 .build();
 
         //-- relationships --//
-        Relationship REL_SUCCESS_REQ = new Relationship.Builder()
+        public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
                 .name("Original")
                 .description("Original FlowFile will be routed upon success (2xx status codes).")
                 .build();
 
-        Relationship REL_SUCCESS_RESP = new Relationship.Builder()
+        public static final Relationship REL_SUCCESS_RESP = new Relationship.Builder()
                 .name("Response")
                 .description("Response FlowFile will be routed upon success (2xx status codes).")
                 .build();
 
-        Relationship REL_RETRY = new Relationship.Builder()
+        public static final Relationship REL_RETRY = new Relationship.Builder()
                 .name("Retry")
                 .description("FlowFile will be routed on any status code that can be retried (5xx status codes).")
                 .build();
 
-        Relationship REL_NO_RETRY = new Relationship.Builder()
+        public static final Relationship REL_NO_RETRY = new Relationship.Builder()
                 .name("No Retry")
                 .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).")
                 .build();
 
-        Relationship REL_FAILURE = new Relationship.Builder()
+        public static final Relationship REL_FAILURE = new Relationship.Builder()
                 .name("Failure")
                 .description("FlowFile will be routed on any type of connection failure, timeout or general exception.")
                 .build();
 
-        Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+        public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
                 REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE
         )));
 
     }
 
     /**
-     * A single invocation of an HTTP request/response from the InvokeHTTP
-     * processor. This class encapsulates the entirety of the flowfile
-     * processing.
+     * A single invocation of an HTTP request/response from the InvokeHTTP processor. This class encapsulates the entirety of the flowfile processing.
      * <p>
-     * This class is not thread safe and is created new for every flowfile
-     * processed.
+     * This class is not thread safe and is created new for every flowfile processed.
      */
     private static class Transaction implements Config {
 
         /**
-         * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is
-         * used by the HTTP Date header and is optionally sent by the processor.
-         * This date is effectively an RFC 822/1123 date string, but HTTP
-         * requires it to be in GMT (preferring the literal 'GMT' string).
+         * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date
+         * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string).
          */
         private static final String rfc1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'";
         private static final DateTimeFormatter dateFormat = DateTimeFormat.forPattern(rfc1123).withLocale(Locale.US).withZoneUTC();
 
         /**
-         * Every request/response cycle from this client has a unique
-         * transaction id which will be stored as a flowfile attribute. This
-         * generator is used to create the id.
+         * Every request/response cycle from this client has a unique transaction id which will be stored as a flowfile attribute. This generator is used to create the id.
          */
         private static final AtomicLong txIdGenerator = new AtomicLong();
 
@@ -507,11 +499,8 @@ public final class InvokeHTTP extends AbstractProcessor {
             }
 
             // log the status codes from the response
-            logger.info("Request to {} returned status code {} for {}", new Object[]{
-                conn.getURL().toExternalForm(),
-                statusCode,
-                request
-            });
+            logger.info("Request to {} returned status code {} for {}",
+                    new Object[]{conn.getURL().toExternalForm(), statusCode, request});
 
             // transfer to the correct relationship
             // 2xx -> SUCCESS
@@ -565,13 +554,13 @@ public final class InvokeHTTP extends AbstractProcessor {
         }
 
         /**
-         * Returns a Map of flowfile attributes from the response http headers.
-         * Multivalue headers are naively converted to comma separated strings.
+         * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
          */
         private Map<String, String> convertAttributesFromHeaders() throws IOException {
             // create a new hashmap to store the values from the connection
             Map<String, String> map = new HashMap<>();
-            for (Map.Entry<String, List<String>> entry : conn.getHeaderFields().entrySet()) {
+            for (Map.Entry<String, List<String>> entry : conn.getHeaderFields().
+                    entrySet()) {
                 String key = entry.getKey();
                 if (key == null) {
                     continue;
@@ -610,17 +599,13 @@ public final class InvokeHTTP extends AbstractProcessor {
         }
 
         private void logRequest() {
-            logger.debug("\nRequest to remote service:\n\t{}\n{}", new Object[]{
-                conn.getURL().toExternalForm(),
-                getLogString(conn.getRequestProperties())
-            });
+            logger.debug("\nRequest to remote service:\n\t{}\n{}",
+                    new Object[]{conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())});
         }
 
         private void logResponse() {
-            logger.debug("\nResponse from remote service:\n\t{}\n{}", new Object[]{
-                conn.getURL().toExternalForm(),
-                getLogString(conn.getHeaderFields())
-            });
+            logger.debug("\nResponse from remote service:\n\t{}\n{}",
+                    new Object[]{conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())});
         }
 
         private String getLogString(Map<String, List<String>> map) {
@@ -644,12 +629,9 @@ public final class InvokeHTTP extends AbstractProcessor {
         }
 
         /**
-         * Convert a collection of string values into a overly simple comma
-         * separated string.
+         * Convert a collection of string values into a overly simple comma separated string.
          *
-         * Does not handle the case where the value contains the delimiter. i.e.
-         * if a value contains a comma, this method does nothing to try and
-         * escape or quote the value, in traditional csv style.
+         * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style.
          */
         private String csv(Collection<String> values) {
             if (values == null || values.isEmpty()) {
@@ -674,16 +656,14 @@ public final class InvokeHTTP extends AbstractProcessor {
         }
 
         /**
-         * Return the current datetime as an RFC 1123 formatted string in the
-         * GMT tz.
+         * Return the current datetime as an RFC 1123 formatted string in the GMT tz.
          */
         private String getDateValue() {
             return dateFormat.print(System.currentTimeMillis());
         }
 
         /**
-         * Returns a string from the input stream using the specified character
-         * encoding.
+         * Returns a string from the input stream using the specified character encoding.
          */
         private String toString(InputStream is, Charset charset) throws IOException {
             if (is == null) {
@@ -700,13 +680,9 @@ public final class InvokeHTTP extends AbstractProcessor {
         }
 
         /**
-         * Returns the input stream to use for reading from the remote server.
-         * We're either going to want the inputstream or errorstream,
-         * effectively depending on the status code.
+         * Returns the input stream to use for reading from the remote server. We're either going to want the inputstream or errorstream, effectively depending on the status code.
          * <p>
-         * This method can return null if there is no inputstream to read from.
-         * For example, if the remote server did not send a message body. eg.
-         * 204 No Content or 304 Not Modified
+         * This method can return null if there is no inputstream to read from. For example, if the remote server did not send a message body. eg. 204 No Content or 304 Not Modified
          */
         private InputStream getResponseStream() {
             try {
@@ -723,8 +699,7 @@ public final class InvokeHTTP extends AbstractProcessor {
         }
 
         /**
-         * Writes the status attributes onto the flowfile, returning the
-         * flowfile that was updated.
+         * Writes the status attributes onto the flowfile, returning the flowfile that was updated.
          */
         private FlowFile writeStatusAttributes(FlowFile flowfile) {
             flowfile = session.putAttribute(flowfile, STATUS_CODE, String.valueOf(statusCode));

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
index e4bbaec..6b3283c 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
@@ -65,9 +65,11 @@ import org.apache.nifi.util.StopWatch;
 public abstract class JmsConsumer extends AbstractProcessor {
 
     public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage.";
-    
-    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
-            .description("All FlowFiles are routed to success").build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles are routed to success")
+            .build();
 
     private final Set<Relationship> relationships;
     private final List<PropertyDescriptor> propertyDescriptors;
@@ -112,7 +114,7 @@ public abstract class JmsConsumer extends AbstractProcessor {
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
 
         final JmsProcessingSummary processingSummary = new JmsProcessingSummary();
-        
+
         final StopWatch stopWatch = new StopWatch(true);
         for (int i = 0; i < batchSize; i++) {
 
@@ -132,20 +134,20 @@ public abstract class JmsConsumer extends AbstractProcessor {
                 break;
             }
 
-            if (message == null) {    // if no messages, we're done
+            if (message == null) { // if no messages, we're done
                 break;
             }
 
             try {
-				processingSummary.add( map2FlowFile(context, session, message, addAttributes, logger) );
-			} catch (Exception e) {
+                processingSummary.add(map2FlowFile(context, session, message, addAttributes, logger));
+            } catch (Exception e) {
                 logger.error("Failed to receive JMS Message due to {}", e);
                 wrappedConsumer.close(logger);
-                break;				
-			}
+                break;
+            }
         }
-        
-        if (processingSummary.getFlowFilesCreated()==0) {
+
+        if (processingSummary.getFlowFilesCreated() == 0) {
             context.yield();
             return;
         }
@@ -153,11 +155,12 @@ public abstract class JmsConsumer extends AbstractProcessor {
         session.commit();
 
         stopWatch.stop();
-        if (processingSummary.getFlowFilesCreated()>0) {
+        if (processingSummary.getFlowFilesCreated() > 0) {
             final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
             float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs;
             final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
-            logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
+            logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}",
+                    new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
         }
 
         // if we need to acknowledge the messages, do so now.
@@ -166,49 +169,51 @@ public abstract class JmsConsumer extends AbstractProcessor {
             try {
                 lastMessage.acknowledge();  // acknowledge all received messages by acknowledging only the last.
             } catch (final JMSException e) {
-                logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{processingSummary.getMessagesReceived(), e});
+                logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}",
+                        new Object[]{processingSummary.getMessagesReceived(), e});
             }
         }
     }
-    
+
     public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ProcessorLog logger) throws Exception {
-    	
-    	// Currently not very useful, because always one Message == one FlowFile
+
+        // Currently not very useful, because always one Message == one FlowFile
         final IntegerHolder msgsThisFlowFile = new IntegerHolder(1);
-        
+
         FlowFile flowFile = session.create();
         try {
-        	// MapMessage is exception, add only name-value pairs to FlowFile attributes
+            // MapMessage is exception, add only name-value pairs to FlowFile attributes
             if (message instanceof MapMessage) {
-				MapMessage mapMessage = (MapMessage) message;
-            	flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));				
-			}
-            // all other message types, write Message body to FlowFile content 
+                MapMessage mapMessage = (MapMessage) message;
+                flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));
+            } // all other message types, write Message body to FlowFile content 
             else {
-	            flowFile = session.write(flowFile, new OutputStreamCallback() {
-	                @Override
-	                public void process(final OutputStream rawOut) throws IOException {
-	                    try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
-	                        final byte[] messageBody = JmsFactory.createByteArray(message);
-	                        out.write(messageBody);
-	                    } catch (final JMSException e) {
-	                        throw new ProcessException("Failed to receive JMS Message due to {}", e);
-	                    }
-	                }
-	            });
+                flowFile = session.write(flowFile, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream rawOut) throws IOException {
+                        try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
+                            final byte[] messageBody = JmsFactory.createByteArray(message);
+                            out.write(messageBody);
+                        } catch (final JMSException e) {
+                            throw new ProcessException("Failed to receive JMS Message due to {}", e);
+                        }
+                    }
+                });
             }
-            
-            if (addAttributes)
-            	flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
-            
+
+            if (addAttributes) {
+                flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
+            }
+
             session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
             session.transfer(flowFile, REL_SUCCESS);
-            logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
+            logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'",
+                    new Object[]{flowFile, msgsThisFlowFile.get()});
 
             return new JmsProcessingSummary(flowFile.getSize(), message, flowFile);
-            
+
         } catch (Exception e) {
-        	session.remove(flowFile);
+            session.remove(flowFile);
             throw e;
         }
     }
@@ -221,13 +226,14 @@ public abstract class JmsConsumer extends AbstractProcessor {
             final String name = (String) enumeration.nextElement();
 
             final Object value = mapMessage.getObject(name);
-            if (value==null)
-            	valueMap.put(MAP_MESSAGE_PREFIX+name, "");
-            else
-            	valueMap.put(MAP_MESSAGE_PREFIX+name, value.toString());        
+            if (value == null) {
+                valueMap.put(MAP_MESSAGE_PREFIX + name, "");
+            } else {
+                valueMap.put(MAP_MESSAGE_PREFIX + name, value.toString());
+            }
         }
 
         return valueMap;
-    }    
-    
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index d9317c4..c7842d9 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -107,11 +107,11 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             .identifiesControllerService(SSLContextService.class)
             .build();
     public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
-		    .name("HTTP Headers to receive as Attributes (Regex)")
-		    .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
-		    .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-		    .required(false)
-		    .build();
+            .name("HTTP Headers to receive as Attributes (Regex)")
+            .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes")
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .required(false)
+            .build();
 
     public static final String URI = "/contentListener";
     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
@@ -124,7 +124,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
     public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
 
     private volatile Server server = null;
-    private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<String, FlowFileEntryTimeWrapper>();
+    private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>();
     private final AtomicReference<ProcessSessionFactory> sessionFactoryReference = new AtomicReference<>();
 
     @Override
@@ -218,9 +218,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             httpConfiguration.addCustomizer(new SecureRequestCustomizer());
 
             // build the connector
-            connector = new ServerConnector(server,
-                    new SslConnectionFactory(contextFactory, "http/1.1"),
-                    new HttpConnectionFactory(httpConfiguration));
+            connector = new ServerConnector(server, new SslConnectionFactory(contextFactory, "http/1.1"), new HttpConnectionFactory(httpConfiguration));
         }
 
         // configure the port
@@ -247,7 +245,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
 
         if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
-        	contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
+            contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
         }
         server.start();
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
index 561e333..fa17df1 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
@@ -73,50 +73,35 @@ import org.apache.commons.lang3.StringUtils;
 
 /**
  * <p>
- * This processor listens for Datagram Packets on a given port and concatenates
- * the contents of those packets together generating flow files roughly as often
- * as the internal buffer fills up or until no more data is currently available.
+ * This processor listens for Datagram Packets on a given port and concatenates the contents of those packets together generating flow files roughly as often as the internal buffer fills up or until
+ * no more data is currently available.
  * </p>
  *
  * <p>
  * This processor has the following required properties:
  * <ul>
- * <li><b>Port</b> - The port to listen on for data packets. Must be known by
- * senders of Datagrams.</li>
- * <li><b>Receive Timeout</b> - The time out period when waiting to receive data
- * from the socket. Specify units. Default is 5 secs.</li>
- * <li><b>Max Buffer Size</b> - Determines the size each receive buffer may be.
- * Specify units. Default is 1 MB.</li>
- * <li><b>FlowFile Size Trigger</b> - Determines the (almost) upper bound size
- * at which a flow file would be generated. A flow file will get made even if
- * this value isn't reached if there is no more data streaming in and this value
- * may be exceeded by the size of a single packet. Specify units. Default is 1
- * MB.</li>
- * <li><b>Max size of UDP Buffer</b> - The maximum UDP buffer size that should
- * be used. This is a suggestion to the Operating System to indicate how big the
- * udp socket buffer should be. Specify units. Default is 1 MB.")</li>
- * <li><b>Receive Buffer Count</b> - Number of receiving buffers to be used to
- * accept data from the socket. Higher numbers means more ram is allocated but
- * can allow better throughput. Default is 4.</li>
- * <li><b>Channel Reader Interval</b> - Scheduling interval for each read
- * channel. Specify units. Default is 50 millisecs.</li>
- * <li><b>FlowFiles Per Session</b> - The number of flow files per session.
- * Higher number is more efficient, but will lose more data if a problem occurs
- * that causes a rollback of a session. Default is 10</li>
+ * <li><b>Port</b> - The port to listen on for data packets. Must be known by senders of Datagrams.</li>
+ * <li><b>Receive Timeout</b> - The time out period when waiting to receive data from the socket. Specify units. Default is 5 secs.</li>
+ * <li><b>Max Buffer Size</b> - Determines the size each receive buffer may be. Specify units. Default is 1 MB.</li>
+ * <li><b>FlowFile Size Trigger</b> - Determines the (almost) upper bound size at which a flow file would be generated. A flow file will get made even if this value isn't reached if there is no more
+ * data streaming in and this value may be exceeded by the size of a single packet. Specify units. Default is 1 MB.</li>
+ * <li><b>Max size of UDP Buffer</b> - The maximum UDP buffer size that should be used. This is a suggestion to the Operating System to indicate how big the udp socket buffer should be. Specify units.
+ * Default is 1 MB.")</li>
+ * <li><b>Receive Buffer Count</b> - Number of receiving buffers to be used to accept data from the socket. Higher numbers means more ram is allocated but can allow better throughput. Default is
+ * 4.</li>
+ * <li><b>Channel Reader Interval</b> - Scheduling interval for each read channel. Specify units. Default is 50 millisecs.</li>
+ * <li><b>FlowFiles Per Session</b> - The number of flow files per session. Higher number is more efficient, but will lose more data if a problem occurs that causes a rollback of a session. Default is
+ * 10</li>
  * </ul>
  * </p>
  *
  * This processor has the following optional properties:
  * <ul>
- * <li><b>Sending Host</b> - IP, or name, of a remote host. Only Datagrams from
- * the specified Sending Host Port and this host will be accepted. Improves
- * Performance. May be a system property or an environment variable.</li>
- * <li><b>Sending Host Port</b> - Port being used by remote host to send
- * Datagrams. Only Datagrams from the specified Sending Host and this port will
- * be accepted. Improves Performance. May be a system property or an environment
- * variable.</li>
+ * <li><b>Sending Host</b> - IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will be accepted. Improves Performance. May be a system property or an
+ * environment variable.</li>
+ * <li><b>Sending Host Port</b> - Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and this port will be accepted. Improves Performance. May be a system
+ * property or an environment variable.</li>
  * </ul>
- * </p>
  *
  * <p>
  * The following relationships are required:
@@ -142,7 +127,7 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
             .build();
 
     static {
-        Set<Relationship> rels = new HashSet<>();
+        final Set<Relationship> rels = new HashSet<>();
         rels.add(RELATIONSHIP_SUCCESS);
         relationships = Collections.unmodifiableSet(rels);
     }
@@ -233,8 +218,7 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
 
     static {
         try {
-            final Enumeration<NetworkInterface> interfaceEnum
-                    = NetworkInterface.getNetworkInterfaces();
+            final Enumeration<NetworkInterface> interfaceEnum = NetworkInterface.getNetworkInterfaces();
             while (interfaceEnum.hasMoreElements()) {
                 final NetworkInterface ifc = interfaceEnum.nextElement();
                 interfaceSet.add(ifc.getName());
@@ -242,18 +226,15 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
         } catch (SocketException e) {
         }
     }
-    public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder()
-            .name("Local Network Interface")
-            .description("The name of a local network interface to be used to restrict listening for UDP Datagrams to a specific LAN."
-                    + "May be a system property or an environment variable.")
-            .addValidator(new Validator() {
+    public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder().
+            name("Local Network Interface").
+            description("The name of a local network interface to be used to restrict listening for UDP Datagrams to a specific LAN."
+                    + "May be a system property or an environment variable.").
+            addValidator(new Validator() {
                 @Override
                 public ValidationResult validate(String subject, String input, ValidationContext context) {
                     ValidationResult result = new ValidationResult.Builder()
-                    .subject("Local Network Interface")
-                    .valid(true)
-                    .input(input)
-                    .build();
+                    .subject("Local Network Interface").valid(true).input(input).build();
                     if (interfaceSet.contains(input.toLowerCase())) {
                         return result;
                     }
@@ -271,18 +252,12 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
                     } catch (IllegalArgumentException e) {
                         message = "Not a valid AttributeExpression: " + e.getMessage();
                     }
-                    result = new ValidationResult.Builder()
-                    .subject("Local Network Interface")
-                    .valid(false)
-                    .input(input)
-                    .explanation(message)
-                    .build();
+                    result = new ValidationResult.Builder().subject("Local Network Interface")
+                    .valid(false).input(input).explanation(message).build();
 
                     return result;
                 }
-            })
-            .expressionLanguageSupported(true)
-            .build();
+            }).expressionLanguageSupported(true).build();
 
     static {
         List<PropertyDescriptor> props = new ArrayList<>();
@@ -326,8 +301,7 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
     }
 
     /**
-     * Create the ChannelListener and a thread that causes the Consumer to
-     * create flow files.
+     * Create the ChannelListener and a thread that causes the Consumer to create flow files.
      *
      * @param context
      * @throws IOException
@@ -336,8 +310,8 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
     public void initializeChannelListenerAndConsumerProcessing(final ProcessContext context) throws IOException {
         getChannelListener(context);
         stopping.set(false);
-        Future<Tuple<ProcessSession, List<FlowFile>>> consumerFuture = consumerExecutorService
-                .submit(new Callable<Tuple<ProcessSession, List<FlowFile>>>() {
+        Future<Tuple<ProcessSession, List<FlowFile>>> consumerFuture = consumerExecutorService.
+                submit(new Callable<Tuple<ProcessSession, List<FlowFile>>>() {
 
                     @Override
                     public Tuple<ProcessSession, List<FlowFile>> call() {
@@ -364,11 +338,10 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
                                         logger.debug("Have waited {} times", new Object[]{numWaits});
                                         numWaits = 0;
                                         if (session != null) {
-                                            Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(
-                                                    session,
-                                                    new ArrayList<>(newFlowFiles));
+                                            Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session, new ArrayList<>(newFlowFiles));
                                             newFlowFiles.clear();
-                                            flowFilesPerSessionQueue.add(flowFilesPerSession);
+                                            flowFilesPerSessionQueue.
+                                            add(flowFilesPerSession);
                                         }
                                         session = sessionFactoryRef.get().createSession();
                                         consumer.setSession(session);
@@ -422,8 +395,7 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
                                 // if this is blown...consumer.isConsumerFinished will be true
                             }
                         }
-                        Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session,
-                                new ArrayList<>(newFlowFiles));
+                        Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session, new ArrayList<>(newFlowFiles));
                         return flowFilesPerSession;
                     }
                 });
@@ -462,7 +434,8 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
 
                     @Override
                     public StreamConsumer newInstance(final String streamId) {
-                        final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), getLogger());
+                        final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.
+                                intValue(), getLogger());
                         consumerRef.set(consumer);
                         return consumer;
                     }
@@ -491,17 +464,11 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
         String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
         String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
         if (StringUtils.isBlank(sendingHost) && StringUtils.isNotBlank(sendingPort)) {
-            result.add(new ValidationResult.Builder()
-                    .subject(SENDING_HOST.getName())
-                    .valid(false)
-                    .explanation("Must specify Sending Host when specifying Sending Host Port")
-                    .build());
+            result.add(new ValidationResult.Builder().subject(SENDING_HOST.getName()).valid(false)
+                    .explanation("Must specify Sending Host when specifying Sending Host Port").build());
         } else if (StringUtils.isBlank(sendingPort) && StringUtils.isNotBlank(sendingHost)) {
-            result.add(new ValidationResult.Builder()
-                    .subject(SENDING_HOST_PORT.getName())
-                    .valid(false)
-                    .explanation("Must specify Sending Host Port when specifying Sending Host")
-                    .build());
+            result.add(
+                    new ValidationResult.Builder().subject(SENDING_HOST_PORT.getName()).valid(false).explanation("Must specify Sending Host Port when specifying Sending Host").build());
         }
         return result;
     }
@@ -552,7 +519,8 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
                                 new Object[]{existingFlowFile});
                     } else if (existingFlowFile != null) {
                         session.remove(existingFlowFile);
-                        logger.warn("Found empty flow file in input queue (shouldn't have). Removed flow file {}", new Object[]{existingFlowFile});
+                        logger.warn("Found empty flow file in input queue (shouldn't have). Removed flow file {}",
+                                new Object[]{existingFlowFile});
                     }
                 }
                 session.commit();
@@ -607,18 +575,9 @@ public class ListenUDP extends AbstractSessionFactoryProcessor {
         public ValidationResult validate(String subject, String input, ValidationContext context) {
             try {
                 InetAddress.getByName(input);
-                return new ValidationResult.Builder()
-                        .subject(subject)
-                        .valid(true)
-                        .input(input)
-                        .build();
+                return new ValidationResult.Builder().subject(subject).valid(true).input(input).build();
             } catch (final UnknownHostException e) {
-                return new ValidationResult.Builder()
-                        .subject(subject)
-                        .valid(false)
-                        .input(input)
-                        .explanation("Unknown host: " + e)
-                        .build();
+                return new ValidationResult.Builder().subject(subject).valid(false).input(input).explanation("Unknown host: " + e).build();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java
index daf513b..b493c93 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java
@@ -90,7 +90,10 @@ public class LogAttribute extends AbstractProcessor {
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> supportedDescriptors;
 
-    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are routed to this relationship").build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles are routed to this relationship")
+            .build();
 
     @Override
     protected void init(final ProcessorInitializationContext context) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 9ca6470..bd639dd 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -74,18 +74,36 @@ import org.apache.nifi.util.ObjectHolder;
 @SideEffectFree
 @TriggerWhenEmpty
 @Tags({"merge", "content", "correlation", "tar", "zip", "stream", "concatenation", "archive", "flowfile-stream", "flowfile-stream-v3"})
-@CapabilityDescription("Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.")
-@ReadsAttributes({ @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together"),
-    @ReadsAttribute(attribute = "fragment.index", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute and must be a unique integer between 0 and the value of the fragment.count attribute. This attribute indicates the order in which the fragments should be assembled"),
-    @ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected in the given bundle"),
-    @ReadsAttribute(attribute = "segment.original.filename", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged FlowFile"),
-    @ReadsAttribute(attribute = "tar.permissions", description = "Applicable only if the <Merge Format> property is set to TAR. The value of this attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used") })
-@WritesAttributes({ @WritesAttribute(attribute = "filename", description = "When more than 1 file is merged, the filename comes from the segment.original.filename attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching system time. Then a filename extension may be applied:"
-+"if Merge Format is TAR, then the filename will be appended with .tar, "
-+"if Merge Format is ZIP, then the filename will be appended with .zip, "
-+"if Merge Format is FlowFileStream, then the filename will be appended with .pkg"),
-        @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
-        @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") })
+@CapabilityDescription("Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. "
+        + "It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be "
+        + "created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.")
+@ReadsAttributes({
+    @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+            + "All FlowFiles with the same value for this attribute will be bundled together"),
+    @ReadsAttribute(attribute = "fragment.index", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This "
+            + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute and must be a unique integer "
+            + "between 0 and the value of the fragment.count attribute. This attribute indicates the order in which the fragments should be assembled"),
+    @ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This "
+            + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+            + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected "
+            + "in the given bundle"),
+    @ReadsAttribute(attribute = "segment.original.filename", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+            + "This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+            + "bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged "
+            + "FlowFile"),
+    @ReadsAttribute(attribute = "tar.permissions", description = "Applicable only if the <Merge Format> property is set to TAR. The value of this "
+            + "attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should "
+            + "be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used")})
+@WritesAttributes({
+    @WritesAttribute(attribute = "filename", description = "When more than 1 file is merged, the filename comes from the segment.original.filename "
+            + "attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching "
+            + "system time. Then a filename extension may be applied:"
+            + "if Merge Format is TAR, then the filename will be appended with .tar, "
+            + "if Merge Format is ZIP, then the filename will be appended with .zip, "
+            + "if Merge Format is FlowFileStream, then the filename will be appended with .pkg"),
+    @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
+    @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
+            + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output")})
 @SeeAlso(SegmentContent.class)
 public class MergeContent extends BinFiles {
 
@@ -103,11 +121,14 @@ public class MergeContent extends BinFiles {
     public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
             "Bin-Packing Algorithm",
             "Bin-Packing Algorithm",
-            "Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally their attributes (if the <Correlation Attribute> property is set)");
+            "Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally "
+            + "their attributes (if the <Correlation Attribute> property is set)");
     public static final AllowableValue MERGE_STRATEGY_DEFRAGMENT = new AllowableValue(
             "Defragment",
             "Defragment",
-            "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must have the attributes <fragment.identifier>, <fragment.count>, and <fragment.index> or alternatively (for backward compatibility purposes) <segment.identifier>, <segment.count>, and <segment.index>");
+            "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must "
+            + "have the attributes <fragment.identifier>, <fragment.count>, and <fragment.index> or alternatively (for backward compatibility "
+            + "purposes) <segment.identifier>, <segment.count>, and <segment.index>");
 
     public static final String MERGE_FORMAT_TAR_VALUE = "TAR";
     public static final String MERGE_FORMAT_ZIP_VALUE = "ZIP";
@@ -119,11 +140,16 @@ public class MergeContent extends BinFiles {
     public static final AllowableValue MERGE_FORMAT_TAR = new AllowableValue(
             MERGE_FORMAT_TAR_VALUE,
             MERGE_FORMAT_TAR_VALUE,
-            "A bin of FlowFiles will be combined into a single TAR file. The FlowFiles' <path> attribute will be used to create a directory in the TAR file if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be added at the root of the TAR file. If a FlowFile has an attribute named <tar.permissions> that is 3 characters, each between 0-7, that attribute will be used as the TAR entry's 'mode'.");
+            "A bin of FlowFiles will be combined into a single TAR file. The FlowFiles' <path> attribute will be used to create a directory in the "
+            + "TAR file if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be added at the root of the TAR file. "
+            + "If a FlowFile has an attribute named <tar.permissions> that is 3 characters, each between 0-7, that attribute will be used "
+            + "as the TAR entry's 'mode'.");
     public static final AllowableValue MERGE_FORMAT_ZIP = new AllowableValue(
             MERGE_FORMAT_ZIP_VALUE,
             MERGE_FORMAT_ZIP_VALUE,
-            "A bin of FlowFiles will be combined into a single ZIP file. The FlowFiles' <path> attribute will be used to create a directory in the ZIP file if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be added at the root of the ZIP file. The <Compression Level> property indicates the ZIP compression to use.");
+            "A bin of FlowFiles will be combined into a single ZIP file. The FlowFiles' <path> attribute will be used to create a directory in the "
+            + "ZIP file if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be added at the root of the ZIP file. "
+            + "The <Compression Level> property indicates the ZIP compression to use.");
     public static final AllowableValue MERGE_FORMAT_FLOWFILE_STREAM_V3 = new AllowableValue(
             MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE,
             MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE,
@@ -150,7 +176,9 @@ public class MergeContent extends BinFiles {
 
     public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder()
             .name("Merge Strategy")
-            .description("Specifies the algorithm used to merge content. The 'Defragment' algorithm combines fragments that are associated by attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily chosen FlowFiles")
+            .description("Specifies the algorithm used to merge content. The 'Defragment' algorithm combines fragments that are associated by "
+                    + "attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily "
+                    + "chosen FlowFiles")
             .required(true)
             .allowableValues(MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT)
             .defaultValue(MERGE_STRATEGY_BIN_PACK.getValue())
@@ -165,14 +193,18 @@ public class MergeContent extends BinFiles {
     public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder()
             .required(true)
             .name("Attribute Strategy")
-            .description("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same value, will be preserved.")
+            .description("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any "
+                    + "attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. "
+                    + "If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same "
+                    + "value, will be preserved.")
             .allowableValues(ATTRIBUTE_STRATEGY_ALL_COMMON, ATTRIBUTE_STRATEGY_ALL_UNIQUE)
             .defaultValue(ATTRIBUTE_STRATEGY_ALL_COMMON)
             .build();
 
     public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
             .name("Correlation Attribute Name")
-            .description("If specified, like FlowFiles will be binned together, where 'like FlowFiles' means FlowFiles that have the same value for this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.")
+            .description("If specified, like FlowFiles will be binned together, where 'like FlowFiles' means FlowFiles that have the same value for "
+                    + "this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.")
             .required(false)
             .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
             .defaultValue(null)
@@ -180,32 +212,37 @@ public class MergeContent extends BinFiles {
 
     public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
             .name("Header File")
-            .description("Filename specifying the header to use. If not specified, no header is supplied. This property is valid only when using the binary-concatenation merge strategy; otherwise, it is ignored.")
+            .description("Filename specifying the header to use. If not specified, no header is supplied. This property is valid only when using the "
+                    + "binary-concatenation merge strategy; otherwise, it is ignored.")
             .required(false)
             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
             .build();
     public static final PropertyDescriptor FOOTER = new PropertyDescriptor.Builder()
             .name("Footer File")
-            .description("Filename specifying the footer to use. If not specified, no footer is supplied. This property is valid only when using the binary-concatenation merge strategy; otherwise, it is ignored.")
+            .description("Filename specifying the footer to use. If not specified, no footer is supplied. This property is valid only when using the "
+                    + "binary-concatenation merge strategy; otherwise, it is ignored.")
             .required(false)
             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
             .build();
     public static final PropertyDescriptor DEMARCATOR = new PropertyDescriptor.Builder()
             .name("Demarcator File")
-            .description("Filename specifying the demarcator to use. If not specified, no demarcator is supplied. This property is valid only when using the binary-concatenation merge strategy; otherwise, it is ignored.")
+            .description("Filename specifying the demarcator to use. If not specified, no demarcator is supplied. This property is valid only when "
+                    + "using the binary-concatenation merge strategy; otherwise, it is ignored.")
             .required(false)
             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
             .build();
     public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder()
             .name("Compression Level")
-            .description("Specifies the compression level to use when using the Zip Merge Format; if not using the Zip Merge Format, this value is ignored")
+            .description("Specifies the compression level to use when using the Zip Merge Format; if not using the Zip Merge Format, this value is "
+                    + "ignored")
             .required(true)
             .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
             .defaultValue("1")
             .build();
     public static final PropertyDescriptor KEEP_PATH = new PropertyDescriptor.Builder()
             .name("Keep Path")
-            .description("If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry names; if using other merge strategy, this value is ignored")
+            .description("If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry "
+                    + "names; if using other merge strategy, this value is ignored")
             .required(true)
             .allowableValues("true", "false")
             .defaultValue("false")
@@ -215,24 +252,22 @@ public class MergeContent extends BinFiles {
 
     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
 
-
-	@Override
-	public Set<Relationship> getRelationships() {
-	    final Set<Relationship> relationships = new HashSet<>();
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
         relationships.add(REL_ORIGINAL);
         relationships.add(REL_FAILURE);
         relationships.add(REL_MERGED);
         return relationships;
-	}
-	
-	
-	@Override
-	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-	    final List<PropertyDescriptor> descriptors = new ArrayList<>();
-	    descriptors.add(MERGE_STRATEGY);
-	    descriptors.add(MERGE_FORMAT);
-	    descriptors.add(ATTRIBUTE_STRATEGY);
-	    descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(MERGE_STRATEGY);
+        descriptors.add(MERGE_FORMAT);
+        descriptors.add(ATTRIBUTE_STRATEGY);
+        descriptors.add(CORRELATION_ATTRIBUTE_NAME);
         descriptors.add(MIN_ENTRIES);
         descriptors.add(MAX_ENTRIES);
         descriptors.add(MIN_SIZE);
@@ -245,16 +280,15 @@ public class MergeContent extends BinFiles {
         descriptors.add(COMPRESSION_LEVEL);
         descriptors.add(KEEP_PATH);
         return descriptors;
-	}
-	
+    }
+
     private byte[] readContent(final String filename) throws IOException {
         return Files.readAllBytes(Paths.get(filename));
     }
 
-
-	@Override
-	protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
-	    FlowFile processed = flowFile;
+    @Override
+    protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+        FlowFile processed = flowFile;
         // handle backward compatibility with old segment attributes
         if (processed.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
             processed = session.putAttribute(processed, FRAGMENT_COUNT_ATTRIBUTE, processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
@@ -265,12 +299,12 @@ public class MergeContent extends BinFiles {
         if (processed.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
             processed = session.putAttribute(processed, FRAGMENT_ID_ATTRIBUTE, processed.getAttribute(SEGMENT_ID_ATTRIBUTE));
         }
-        
+
         return processed;
-	}
+    }
 
-	@Override
-	protected String getGroupId(final ProcessContext context, final FlowFile flowFile) {
+    @Override
+    protected String getGroupId(final ProcessContext context, final FlowFile flowFile) {
         final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
         String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName);
 
@@ -280,20 +314,21 @@ public class MergeContent extends BinFiles {
         }
 
         return groupId;
-	}
+    }
 
-	@Override
-	protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
-		if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
+    @Override
+    protected void setUpBinManager(final BinManager binManager, final ProcessContext context) {
+        if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
             binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
         }
-	}
+    }
 
-	@Override
-	protected boolean processBin(final Bin unmodifiableBin, final List<FlowFileSessionWrapper> binCopy, final ProcessContext context,
-			final ProcessSession session) throws ProcessException {
+    @Override
+    protected boolean processBin(final Bin unmodifiableBin, final List<FlowFileSessionWrapper> binCopy, final ProcessContext context,
+            final ProcessSession session) throws ProcessException {
 
-        final String mergeFormat = context.getProperty(MERGE_FORMAT).getValue();
+        final String mergeFormat = context.getProperty(MERGE_FORMAT).
+                getValue();
         MergeBin merger;
         switch (mergeFormat) {
             case MERGE_FORMAT_TAR_VALUE:
@@ -329,19 +364,18 @@ public class MergeContent extends BinFiles {
                 break;
         }
 
-
         if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) {
             final String error = getDefragmentValidationError(binCopy);
-            
+
             // Fail the flow files and commit them
             if (error != null) {
                 final String binDescription = binCopy.size() <= 10 ? binCopy.toString() : binCopy.size() + " FlowFiles";
                 getLogger().error(error + "; routing {} to failure", new Object[]{binDescription});
-                for ( final FlowFileSessionWrapper wrapper : binCopy ) {
+                for (final FlowFileSessionWrapper wrapper : binCopy) {
                     wrapper.getSession().transfer(wrapper.getFlowFile(), REL_FAILURE);
                     wrapper.getSession().commit();
                 }
-                
+
                 return true;
             }
             Collections.sort(binCopy, new FragmentComparator());
@@ -385,7 +419,7 @@ public class MergeContent extends BinFiles {
             if (!isNumber(fragmentIndex)) {
                 return "Cannot Defragment " + flowFile + " because it does not have an integer value for the " + FRAGMENT_INDEX_ATTRIBUTE + " attribute";
             }
-            
+
             fragmentIdentifier = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
 
             final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE);
@@ -394,23 +428,26 @@ public class MergeContent extends BinFiles {
             } else if (decidedFragmentCount == null) {
                 decidedFragmentCount = fragmentCount;
             } else if (!decidedFragmentCount.equals(fragmentCount)) {
-                return "Cannot Defragment " + flowFile + " because it is grouped with another FlowFile, and the two have differing values for the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + decidedFragmentCount + " and " + fragmentCount;
+                return "Cannot Defragment " + flowFile + " because it is grouped with another FlowFile, and the two have differing values for the "
+                        + FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + decidedFragmentCount + " and " + fragmentCount;
             }
         }
-        
+
         final int numericFragmentCount;
         try {
             numericFragmentCount = Integer.parseInt(decidedFragmentCount);
         } catch (final NumberFormatException nfe) {
             return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the " + FRAGMENT_COUNT_ATTRIBUTE + " has a non-integer value of " + decidedFragmentCount;
         }
-        
-        if ( bin.size() < numericFragmentCount ) {
-            return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the expected number of fragments is " + decidedFragmentCount + " but found only " + bin.size() + " fragments";
+
+        if (bin.size() < numericFragmentCount) {
+            return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the expected number of fragments is " + decidedFragmentCount + " but found only "
+                    + bin.size() + " fragments";
         }
-        
-        if ( bin.size() > numericFragmentCount ) {
-            return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the expected number of fragments is " + decidedFragmentCount + " but found " + bin.size() + " fragments for this identifier";
+
+        if (bin.size() > numericFragmentCount) {
+            return "Cannot Defragment FlowFiles with Fragment Identifier " + fragmentIdentifier + " because the expected number of fragments is " + decidedFragmentCount + " but found "
+                    + bin.size() + " fragments for this identifier";
         }
 
         return null;
@@ -421,7 +458,8 @@ public class MergeContent extends BinFiles {
             return false;
         }
 
-        return NUMBER_PATTERN.matcher(value).matches();
+        return NUMBER_PATTERN.matcher(value).
+                matches();
     }
 
     private class BinaryConcatenationMerge implements MergeBin {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
index ccebb46..be21b32 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
@@ -55,9 +55,7 @@ public class ModifyBytes extends AbstractProcessor {
             .name("success")
             .description("Processed flowfiles.")
             .build();
-    //
     private final Set<Relationship> relationships;
-    // Properties
     public static final PropertyDescriptor START_OFFSET = new PropertyDescriptor.Builder()
             .name("Start Offset")
             .description("Number of bytes removed at the beginning of the file.")
@@ -72,7 +70,6 @@ public class ModifyBytes extends AbstractProcessor {
             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
             .defaultValue("0 B")
             .build();
-    // 
     private final List<PropertyDescriptor> propDescriptors;
 
     public ModifyBytes() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/54818893/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
index ddb5330..99a29e5 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
@@ -56,8 +56,8 @@ import org.apache.nifi.processor.util.StandardValidators;
 @CapabilityDescription("Monitors the flow for activity and sends out an indicator when the flow has not had any data for "
         + "some specified amount of time and again when the flow's activity is restored")
 @WritesAttributes({
-        @WritesAttribute(attribute = "inactivityStartMillis", description = "The time at which Inactivity began, in the form of milliseconds since Epoch"),
-        @WritesAttribute(attribute = "inactivityDurationMillis", description = "The number of milliseconds that the inactivity has spanned") })
+    @WritesAttribute(attribute = "inactivityStartMillis", description = "The time at which Inactivity began, in the form of milliseconds since Epoch"),
+    @WritesAttribute(attribute = "inactivityDurationMillis", description = "The number of milliseconds that the inactivity has spanned")})
 public class MonitorActivity extends AbstractProcessor {
 
     public static final PropertyDescriptor THRESHOLD = new PropertyDescriptor.Builder()
@@ -69,7 +69,8 @@ public class MonitorActivity extends AbstractProcessor {
             .build();
     public static final PropertyDescriptor CONTINUALLY_SEND_MESSAGES = new PropertyDescriptor.Builder()
             .name("Continually Send Messages")
-            .description("If true, will send inactivity indicator continually every Threshold Duration amount of time until activity is restored; if false, will send an indicator only when the flow first becomes inactive")
+            .description("If true, will send inactivity indicator continually every Threshold Duration amount of time until activity is restored; "
+                    + "if false, will send an indicator only when the flow first becomes inactive")
             .required(true)
             .allowableValues("true", "false")
             .defaultValue("false")
@@ -95,11 +96,23 @@ public class MonitorActivity extends AbstractProcessor {
             .description("If true, will copy all flow file attributes from the flow file that resumed activity to the newly created indicator flow file")
             .required(false)
             .allowableValues("true", "false")
-            .defaultValue("false").build();
+            .defaultValue("false")
+            .build();
 
-    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All incoming FlowFiles are routed to success").build();
-    public static final Relationship REL_INACTIVE = new Relationship.Builder().name("inactive").description("This relationship is used to transfer an Inactivity indicator when no FlowFiles are routed to 'success' for Threshold Duration amount of time").build();
-    public static final Relationship REL_ACTIVITY_RESTORED = new Relationship.Builder().name("activity.restored").description("This relationship is used to transfer an Activity Restored indicator when FlowFiles are routing to 'success' following a period of inactivity").build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All incoming FlowFiles are routed to success")
+            .build();
+    public static final Relationship REL_INACTIVE = new Relationship.Builder()
+            .name("inactive")
+            .description("This relationship is used to transfer an Inactivity indicator when no FlowFiles are routed to 'success' for Threshold "
+                    + "Duration amount of time")
+            .build();
+    public static final Relationship REL_ACTIVITY_RESTORED = new Relationship.Builder()
+            .name("activity.restored")
+            .description("This relationship is used to transfer an Activity Restored indicator when FlowFiles are routing to 'success' following a "
+                    + "period of inactivity")
+            .build();
     public static final Charset UTF8 = Charset.forName("UTF-8");
 
     private List<PropertyDescriptor> properties;