You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/25 16:12:20 UTC

[16/19] nifi git commit: NIFI-810: Addressed several checkstyle violations

NIFI-810: Addressed several checkstyle violations


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ccfb57fe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ccfb57fe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ccfb57fe

Branch: refs/heads/master
Commit: ccfb57fe9ff43f11319dcb1625bfc78b1d88f56a
Parents: b974445
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 7 17:48:51 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 7 17:48:51 2015 -0400

----------------------------------------------------------------------
 .../annotation/behavior/InputRequirement.java   |  70 +-
 .../nifi/processors/aws/s3/PutS3Object.java     |  46 +-
 .../apache/nifi/controller/ProcessorNode.java   |  88 +--
 .../nifi/controller/StandardProcessorNode.java  |  10 +-
 .../standard/Base64EncodeContent.java           | 168 ++---
 .../nifi/processors/standard/ControlRate.java   | 672 +++++++++----------
 6 files changed, 534 insertions(+), 520 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
index 97e6b88..13f442c 100644
--- a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
+++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.annotation.behavior;
 
 import java.lang.annotation.Documented;
@@ -21,31 +37,31 @@ import java.lang.annotation.Target;
 @Retention(RetentionPolicy.RUNTIME)
 @Inherited
 public @interface InputRequirement {
-	Requirement value();
-	
-	public static enum Requirement {
-		/**
-		 * This value is used to indicate that the Processor requires input from other Processors
-		 * in order to run. As a result, the Processor will not be valid if it does not have any
-		 * incoming connections.
-		 */
-		INPUT_REQUIRED,
-		
-		/**
-		 * This value is used to indicate that the Processor will consume data from an incoming
-		 * connection but does not require an incoming connection in order to perform its task.
-		 * If the {@link InputRequirement} annotation is not present, this is the default value
-		 * that is used.
-		 */
-		INPUT_ALLOWED,
-		
-		/**
-		 * This value is used to indicate that the Processor is a "Source Processor" and does
-		 * not accept incoming connections. Because the Processor does not pull FlowFiles from
-		 * an incoming connection, it can be very confusing for users who create incoming connections
-		 * to the Processor. As a result, this value can be used in order to clarify that incoming
-		 * connections will not be used. This prevents the user from even creating such a connection.
-		 */
-		INPUT_FORBIDDEN;
-	}
+    Requirement value();
+
+    public static enum Requirement {
+        /**
+         * This value is used to indicate that the Processor requires input from other Processors
+         * in order to run. As a result, the Processor will not be valid if it does not have any
+         * incoming connections.
+         */
+        INPUT_REQUIRED,
+
+        /**
+         * This value is used to indicate that the Processor will consume data from an incoming
+         * connection but does not require an incoming connection in order to perform its task.
+         * If the {@link InputRequirement} annotation is not present, this is the default value
+         * that is used.
+         */
+        INPUT_ALLOWED,
+
+        /**
+         * This value is used to indicate that the Processor is a "Source Processor" and does
+         * not accept incoming connections. Because the Processor does not pull FlowFiles from
+         * an incoming connection, it can be very confusing for users who create incoming connections
+         * to the Processor. As a result, this value can be used in order to clarify that incoming
+         * connections will not be used. This prevents the user from even creating such a connection.
+         */
+        INPUT_FORBIDDEN;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 7398c4e..c7212f5 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -59,10 +59,8 @@ import com.amazonaws.services.s3.model.StorageClass;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
 @CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
-@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
-        value = "The value of a User-Defined Metadata field to add to the S3 Object",
-        description = "Allows user-defined metadata to be added to the S3 object as key/value pairs",
-        supportsExpressionLanguage = true)
+@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object",
+    description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
 @ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
 @WritesAttributes({
     @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
@@ -72,22 +70,22 @@ import com.amazonaws.services.s3.model.StorageClass;
 public class PutS3Object extends AbstractS3Processor {
 
     public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
-            .name("Expiration Time Rule")
-            .required(false)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
+        .name("Expiration Time Rule")
+        .required(false)
+        .expressionLanguageSupported(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
 
     public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
-            .name("Storage Class")
-            .required(true)
-            .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
-            .defaultValue(StorageClass.Standard.name())
-            .build();
+        .name("Storage Class")
+        .required(true)
+        .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
+        .defaultValue(StorageClass.Standard.name())
+        .build();
 
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-            Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
-                    FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+        Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
+            FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -97,15 +95,15 @@ public class PutS3Object extends AbstractS3Processor {
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                .expressionLanguageSupported(true)
-                .dynamic(true)
-                .build();
+            .name(propertyDescriptorName)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .dynamic(true)
+            .build();
     }
 
     @Override
-	public void onTrigger(final ProcessContext context, final ProcessSession session) {
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
@@ -176,9 +174,9 @@ public class PutS3Object extends AbstractS3Processor {
             final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().send(flowFile, url, millis);
 
-            getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis});
+            getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
         } catch (final ProcessException | AmazonClientException pe) {
-            getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe});
+            getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
             session.transfer(flowFile, REL_FAILURE);
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 2f72d0f..d340c77 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -31,72 +31,72 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
 
 public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable {
 
-	public ProcessorNode(final Processor processor, final String id,
-		final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
-		super(processor, id, validationContextFactory, serviceProvider);
-	}
+    public ProcessorNode(final Processor processor, final String id,
+        final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
+        super(processor, id, validationContextFactory, serviceProvider);
+    }
 
-	public abstract boolean isIsolated();
+    public abstract boolean isIsolated();
 
-	public abstract boolean isTriggerWhenAnyDestinationAvailable();
+    public abstract boolean isTriggerWhenAnyDestinationAvailable();
 
-	@Override
-	public abstract boolean isSideEffectFree();
+    @Override
+    public abstract boolean isSideEffectFree();
 
-	public abstract boolean isTriggeredSerially();
+    public abstract boolean isTriggeredSerially();
 
-	public abstract boolean isEventDrivenSupported();
+    public abstract boolean isEventDrivenSupported();
 
-	public abstract boolean isHighThroughputSupported();
+    public abstract boolean isHighThroughputSupported();
 
-	public abstract Requirement getInputRequirement();
+    public abstract Requirement getInputRequirement();
 
-	@Override
-	public abstract boolean isValid();
+    @Override
+    public abstract boolean isValid();
 
-	public abstract void setScheduledState(ScheduledState scheduledState);
+    public abstract void setScheduledState(ScheduledState scheduledState);
 
-	public abstract void setBulletinLevel(LogLevel bulletinLevel);
+    public abstract void setBulletinLevel(LogLevel bulletinLevel);
 
-	public abstract LogLevel getBulletinLevel();
+    public abstract LogLevel getBulletinLevel();
 
-	public abstract Processor getProcessor();
+    public abstract Processor getProcessor();
 
-	public abstract void yield(long period, TimeUnit timeUnit);
+    public abstract void yield(long period, TimeUnit timeUnit);
 
-	public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships);
+    public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships);
 
-	public abstract Set<Relationship> getAutoTerminatedRelationships();
+    public abstract Set<Relationship> getAutoTerminatedRelationships();
 
-	public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
+    public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
 
-	@Override
-	public abstract SchedulingStrategy getSchedulingStrategy();
+    @Override
+    public abstract SchedulingStrategy getSchedulingStrategy();
 
-	public abstract void setRunDuration(long duration, TimeUnit timeUnit);
+    public abstract void setRunDuration(long duration, TimeUnit timeUnit);
 
-	public abstract long getRunDuration(TimeUnit timeUnit);
+    public abstract long getRunDuration(TimeUnit timeUnit);
 
-	public abstract Map<String, String> getStyle();
+    public abstract Map<String, String> getStyle();
 
-	public abstract void setStyle(Map<String, String> style);
+    public abstract void setStyle(Map<String, String> style);
 
-	/**
-	 * @return the number of threads (concurrent tasks) currently being used by
-	 * this Processor
-	 */
-	public abstract int getActiveThreadCount();
+    /**
+     * @return the number of threads (concurrent tasks) currently being used by
+     *         this Processor
+     */
+    public abstract int getActiveThreadCount();
 
-	/**
-	 * Verifies that this Processor can be started if the provided set of
-	 * services are enabled. This is introduced because we need to verify that
-	 * all components can be started before starting any of them. In order to do
-	 * that, we need to know that this component can be started if the given
-	 * services are enabled, as we will then enable the given services before
-	 * starting this component.
-	 *
-	 * @param ignoredReferences to ignore
-	 */
-	public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
+    /**
+     * Verifies that this Processor can be started if the provided set of
+     * services are enabled. This is introduced because we need to verify that
+     * all components can be started before starting any of them. In order to do
+     * that, we need to know that this component can be started if the given
+     * services are enabled, as we will then enable the given services before
+     * starting this component.
+     *
+     * @param ignoredReferences to ignore
+     */
+    public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index f69c510..ad22c6d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1306,9 +1306,9 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
     }
 
     @Override
-	public void verifyModifiable() throws IllegalStateException {
-		if (isRunning()) {
-			throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
-		}
-	}>>>>>>>2215 bc848b7db395b2ca9ac7cc4dc10891393721
+    public void verifyModifiable() throws IllegalStateException {
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
index 816b407..db45109 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
@@ -56,99 +56,99 @@ import org.apache.nifi.util.StopWatch;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class Base64EncodeContent extends AbstractProcessor {
 
-	public static final String ENCODE_MODE = "Encode";
-	public static final String DECODE_MODE = "Decode";
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
 
-	public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
-		.name("Mode")
-		.description("Specifies whether the content should be encoded or decoded")
-		.required(true)
-		.allowableValues(ENCODE_MODE, DECODE_MODE)
-		.defaultValue(ENCODE_MODE)
-		.build();
-	public static final Relationship REL_SUCCESS = new Relationship.Builder()
-		.name("success")
-		.description("Any FlowFile that is successfully encoded or decoded will be routed to success")
-		.build();
-	public static final Relationship REL_FAILURE = new Relationship.Builder()
-		.name("failure")
-		.description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
-		.build();
+    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+        .name("Mode")
+        .description("Specifies whether the content should be encoded or decoded")
+        .required(true)
+        .allowableValues(ENCODE_MODE, DECODE_MODE)
+        .defaultValue(ENCODE_MODE)
+        .build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("Any FlowFile that is successfully encoded or decoded will be routed to success")
+        .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
+        .build();
 
-	private List<PropertyDescriptor> properties;
-	private Set<Relationship> relationships;
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
 
-	@Override
-	protected void init(final ProcessorInitializationContext context) {
-		final List<PropertyDescriptor> properties = new ArrayList<>();
-		properties.add(MODE);
-		this.properties = Collections.unmodifiableList(properties);
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(MODE);
+        this.properties = Collections.unmodifiableList(properties);
 
-		final Set<Relationship> relationships = new HashSet<>();
-		relationships.add(REL_SUCCESS);
-		relationships.add(REL_FAILURE);
-		this.relationships = Collections.unmodifiableSet(relationships);
-	}
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
 
-	@Override
-	public Set<Relationship> getRelationships() {
-		return relationships;
-	}
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
 
-	@Override
-	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-		return properties;
-	}
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
 
-	@Override
-	public void onTrigger(final ProcessContext context, final ProcessSession session) {
-		FlowFile flowFile = session.get();
-		if (flowFile == null) {
-			return;
-		}
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
 
-		final ProcessorLog logger = getLogger();
+        final ProcessorLog logger = getLogger();
 
-		boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
-		try {
-			final StopWatch stopWatch = new StopWatch(true);
-			if (encode) {
-				flowFile = session.write(flowFile, new StreamCallback() {
-					@Override
-					public void process(InputStream in, OutputStream out) throws IOException {
-						try (Base64OutputStream bos = new Base64OutputStream(out)) {
-							int len = -1;
-							byte[] buf = new byte[8192];
-							while ((len = in.read(buf)) > 0) {
-								bos.write(buf, 0, len);
-							}
-							bos.flush();
-						}
-					}
-				});
-			} else {
-				flowFile = session.write(flowFile, new StreamCallback() {
-					@Override
-					public void process(InputStream in, OutputStream out) throws IOException {
-						try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
-							int len = -1;
-							byte[] buf = new byte[8192];
-							while ((len = bis.read(buf)) > 0) {
-								out.write(buf, 0, len);
-							}
-							out.flush();
-						}
-					}
-				});
-			}
+        boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            if (encode) {
+                flowFile = session.write(flowFile, new StreamCallback() {
+                    @Override
+                    public void process(InputStream in, OutputStream out) throws IOException {
+                        try (Base64OutputStream bos = new Base64OutputStream(out)) {
+                            int len = -1;
+                            byte[] buf = new byte[8192];
+                            while ((len = in.read(buf)) > 0) {
+                                bos.write(buf, 0, len);
+                            }
+                            bos.flush();
+                        }
+                    }
+                });
+            } else {
+                flowFile = session.write(flowFile, new StreamCallback() {
+                    @Override
+                    public void process(InputStream in, OutputStream out) throws IOException {
+                        try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
+                            int len = -1;
+                            byte[] buf = new byte[8192];
+                            while ((len = bis.read(buf)) > 0) {
+                                out.write(buf, 0, len);
+                            }
+                            out.flush();
+                        }
+                    }
+                });
+            }
 
-			logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
-			session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-			session.transfer(flowFile, REL_SUCCESS);
-		} catch (ProcessException e) {
-			logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
-			session.transfer(flowFile, REL_FAILURE);
-		}
-	}
+            logger.info("Successfully {} {}", new Object[] {encode ? "encoded" : "decoded", flowFile});
+            session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (ProcessException e) {
+            logger.error("Failed to {} {} due to {}", new Object[] {encode ? "encode" : "decode", flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index a45c211..0847472 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -61,340 +61,340 @@ import org.apache.nifi.util.timebuffer.TimedBuffer;
 @CapabilityDescription("Controls the rate at which data is transferred to follow-on processors.")
 public class ControlRate extends AbstractProcessor {
 
-	public static final String DATA_RATE = "data rate";
-	public static final String FLOWFILE_RATE = "flowfile count";
-	public static final String ATTRIBUTE_RATE = "attribute value";
-
-	public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
-		.name("Rate Control Criteria")
-		.description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
-		.required(true)
-		.allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
-		.defaultValue(DATA_RATE)
-		.build();
-	public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
-		.name("Maximum Rate")
-		.description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
-			+ "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
-		.required(true)
-		.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
-		.build();
-	public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
-		.name("Rate Controlled Attribute")
-		.description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
-			+ "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. "
-			+ "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.")
-		.required(false)
-		.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-		.expressionLanguageSupported(false)
-		.build();
-	public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
-		.name("Time Duration")
-		.description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.")
-		.required(true)
-		.addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
-		.defaultValue("1 min")
-		.build();
-	public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
-		.name("Grouping Attribute")
-		.description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for "
-			+ "each value specified by the attribute with this name. Changing this value resets the rate counters.")
-		.required(false)
-		.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-		.expressionLanguageSupported(false)
-		.build();
-
-	public static final Relationship REL_SUCCESS = new Relationship.Builder()
-		.name("success")
-		.description("All FlowFiles are transferred to this relationship")
-		.build();
-	public static final Relationship REL_FAILURE = new Relationship.Builder()
-		.name("failure")
-		.description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format")
-		.build();
-
-	private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
-	private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
-
-	private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
-	private List<PropertyDescriptor> properties;
-	private Set<Relationship> relationships;
-	private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
-
-	@Override
-	protected void init(final ProcessorInitializationContext context) {
-		final List<PropertyDescriptor> properties = new ArrayList<>();
-		properties.add(RATE_CONTROL_CRITERIA);
-		properties.add(MAX_RATE);
-		properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
-		properties.add(TIME_PERIOD);
-		properties.add(GROUPING_ATTRIBUTE_NAME);
-		this.properties = Collections.unmodifiableList(properties);
-
-		final Set<Relationship> relationships = new HashSet<>();
-		relationships.add(REL_SUCCESS);
-		this.relationships = Collections.unmodifiableSet(relationships);
-	}
-
-	@Override
-	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-		return properties;
-	}
-
-	@Override
-	public Set<Relationship> getRelationships() {
-		return relationships;
-	}
-
-	@Override
-	protected Collection<ValidationResult> customValidate(final ValidationContext context) {
-		final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
-
-		final Validator rateValidator;
-		switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
-			case DATA_RATE:
-				rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
-				break;
-			case ATTRIBUTE_RATE:
-				rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
-				final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
-				if (rateAttr == null) {
-					validationResults.add(new ValidationResult.Builder()
-						.subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
-						.explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
-						.build());
-				}
-				break;
-			case FLOWFILE_RATE:
-			default:
-				rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
-				break;
-		}
-
-		final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
-		if (!rateResult.isValid()) {
-			validationResults.add(rateResult);
-		}
-
-		return validationResults;
-	}
-
-	@Override
-	public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
-		super.onPropertyModified(descriptor, oldValue, newValue);
-
-		if (descriptor.equals(RATE_CONTROL_CRITERIA)
-			|| descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
-			|| descriptor.equals(GROUPING_ATTRIBUTE_NAME)
-			|| descriptor.equals(TIME_PERIOD)) {
-			// if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
-			throttleMap.clear();
-		} else if (descriptor.equals(MAX_RATE)) {
-			final long newRate;
-			if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
-				newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
-			} else {
-				newRate = Long.parseLong(newValue);
-			}
-
-			for (final Throttle throttle : throttleMap.values()) {
-				throttle.setMaxRate(newRate);
-			}
-		}
-	}
-
-	@Override
-	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-		final long lastClearTime = lastThrottleClearTime.get();
-		final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
-		if (lastClearTime < throttleExpirationMillis) {
-			if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
-				final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
-				while (itr.hasNext()) {
-					final Map.Entry<String, Throttle> entry = itr.next();
-					final Throttle throttle = entry.getValue();
-					if (throttle.tryLock()) {
-						try {
-							if (throttle.lastUpdateTime() < lastClearTime) {
-								itr.remove();
-							}
-						} finally {
-							throttle.unlock();
-						}
-					}
-				}
-			}
-		}
-
-		// TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods
-		FlowFile flowFile = session.get();
-		if (flowFile == null) {
-			return;
-		}
-
-		final ProcessorLog logger = getLogger();
-		final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
-		final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
-		long rateValue;
-		switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
-			case DATA_RATE:
-				rateValue = flowFile.getSize();
-				break;
-			case FLOWFILE_RATE:
-				rateValue = 1;
-				break;
-			case ATTRIBUTE_RATE:
-				final String attributeValue = flowFile.getAttribute(rateControlAttributeName);
-				if (attributeValue == null) {
-					logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[]{flowFile, rateControlAttributeName});
-					session.transfer(flowFile, REL_FAILURE);
-					return;
-				}
-
-				if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
-					logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long",
-						new Object[]{flowFile, rateControlAttributeName, attributeValue});
-					session.transfer(flowFile, REL_FAILURE);
-					return;
-				}
-				rateValue = Long.parseLong(attributeValue);
-				break;
-			default:
-				throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
-		}
-
-		final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
-		final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
-		Throttle throttle = throttleMap.get(groupName);
-		if (throttle == null) {
-			throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
-
-			final String maxRateValue = context.getProperty(MAX_RATE).getValue();
-			final long newRate;
-			if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
-				newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue();
-			} else {
-				newRate = Long.parseLong(maxRateValue);
-			}
-			throttle.setMaxRate(newRate);
-
-			throttleMap.put(groupName, throttle);
-		}
-
-		throttle.lock();
-		try {
-			if (throttle.tryAdd(rateValue)) {
-				logger.info("transferring {} to 'success'", new Object[]{flowFile});
-				session.transfer(flowFile, REL_SUCCESS);
-			} else {
-				flowFile = session.penalize(flowFile);
-				session.transfer(flowFile);
-			}
-		} finally {
-			throttle.unlock();
-		}
-	}
-
-	private static class TimestampedLong {
-
-		private final Long value;
-		private final long timestamp = System.currentTimeMillis();
-
-		public TimestampedLong(final Long value) {
-			this.value = value;
-		}
-
-		public Long getValue() {
-			return value;
-		}
-
-		public long getTimestamp() {
-			return timestamp;
-		}
-	}
-
-	private static class RateEntityAccess implements EntityAccess<TimestampedLong> {
-
-		@Override
-		public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
-			if (oldValue == null && toAdd == null) {
-				return new TimestampedLong(0L);
-			} else if (oldValue == null) {
-				return toAdd;
-			} else if (toAdd == null) {
-				return oldValue;
-			}
-
-			return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
-		}
-
-		@Override
-		public TimestampedLong createNew() {
-			return new TimestampedLong(0L);
-		}
-
-		@Override
-		public long getTimestamp(TimestampedLong entity) {
-			return entity == null ? 0L : entity.getTimestamp();
-		}
-	}
-
-	private static class Throttle extends ReentrantLock {
-
-		private final AtomicLong maxRate = new AtomicLong(1L);
-		private final long timePeriodValue;
-		private final TimeUnit timePeriodUnit;
-		private final TimedBuffer<TimestampedLong> timedBuffer;
-		private final ProcessorLog logger;
-
-		private volatile long penalizationExpired;
-		private volatile long lastUpdateTime;
-
-		public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) {
-			this.timePeriodUnit = unit;
-			this.timePeriodValue = timePeriod;
-			this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess());
-			this.logger = logger;
-		}
-
-		public void setMaxRate(final long maxRate) {
-			this.maxRate.set(maxRate);
-		}
-
-		public long lastUpdateTime() {
-			return lastUpdateTime;
-		}
-
-		public boolean tryAdd(final long value) {
-			final long now = System.currentTimeMillis();
-			if (penalizationExpired > now) {
-				return false;
-			}
-
-			final long maxRateValue = maxRate.get();
-
-			final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit));
-			if (sum != null && sum.getValue() >= maxRateValue) {
-				logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[]{sum.getValue(), value});
-				return false;
-			}
-
-			logger.debug("current sum for throttle is {}, so allowing rate of {} through",
-				new Object[]{sum == null ? 0 : sum.getValue(), value});
-
-			final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
-			if (transferred > maxRateValue) {
-				final long amountOver = transferred - maxRateValue;
-				// determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
-				final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
-				final double pct = (double) amountOver / (double) maxRateValue;
-				final long penalizationPeriod = (long) (milliDuration * pct);
-				this.penalizationExpired = now + penalizationPeriod;
-				logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
-			}
-
-			lastUpdateTime = now;
-			return true;
-		}
-	}
+    public static final String DATA_RATE = "data rate";
+    public static final String FLOWFILE_RATE = "flowfile count";
+    public static final String ATTRIBUTE_RATE = "attribute value";
+
+    public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
+        .name("Rate Control Criteria")
+        .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
+        .required(true)
+        .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
+        .defaultValue(DATA_RATE)
+        .build();
+    public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
+        .name("Maximum Rate")
+        .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+            + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
+        .build();
+    public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+        .name("Rate Controlled Attribute")
+        .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
+            + "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. "
+            + "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+    public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
+        .name("Time Duration")
+        .description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.")
+        .required(true)
+        .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+        .defaultValue("1 min")
+        .build();
+    public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+        .name("Grouping Attribute")
+        .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for "
+            + "each value specified by the attribute with this name. Changing this value resets the rate counters.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles are transferred to this relationship")
+        .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format")
+        .build();
+
+    private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
+    private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
+
+    private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+    private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RATE_CONTROL_CRITERIA);
+        properties.add(MAX_RATE);
+        properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
+        properties.add(TIME_PERIOD);
+        properties.add(GROUPING_ATTRIBUTE_NAME);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+        final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
+
+        final Validator rateValidator;
+        switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+            case DATA_RATE:
+                rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
+                break;
+            case ATTRIBUTE_RATE:
+                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+                final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+                if (rateAttr == null) {
+                    validationResults.add(new ValidationResult.Builder()
+                        .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
+                        .explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
+                        .build());
+                }
+                break;
+            case FLOWFILE_RATE:
+            default:
+                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+                break;
+        }
+
+        final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
+        if (!rateResult.isValid()) {
+            validationResults.add(rateResult);
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        super.onPropertyModified(descriptor, oldValue, newValue);
+
+        if (descriptor.equals(RATE_CONTROL_CRITERIA)
+            || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
+            || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
+            || descriptor.equals(TIME_PERIOD)) {
+            // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
+            throttleMap.clear();
+        } else if (descriptor.equals(MAX_RATE)) {
+            final long newRate;
+            if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
+                newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
+            } else {
+                newRate = Long.parseLong(newValue);
+            }
+
+            for (final Throttle throttle : throttleMap.values()) {
+                throttle.setMaxRate(newRate);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final long lastClearTime = lastThrottleClearTime.get();
+        final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+        if (lastClearTime < throttleExpirationMillis) {
+            if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
+                final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
+                while (itr.hasNext()) {
+                    final Map.Entry<String, Throttle> entry = itr.next();
+                    final Throttle throttle = entry.getValue();
+                    if (throttle.tryLock()) {
+                        try {
+                            if (throttle.lastUpdateTime() < lastClearTime) {
+                                itr.remove();
+                            }
+                        } finally {
+                            throttle.unlock();
+                        }
+                    }
+                }
+            }
+        }
+
+        // TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ProcessorLog logger = getLogger();
+        final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
+        final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+        long rateValue;
+        switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+            case DATA_RATE:
+                rateValue = flowFile.getSize();
+                break;
+            case FLOWFILE_RATE:
+                rateValue = 1;
+                break;
+            case ATTRIBUTE_RATE:
+                final String attributeValue = flowFile.getAttribute(rateControlAttributeName);
+                if (attributeValue == null) {
+                    logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[] {flowFile, rateControlAttributeName});
+                    session.transfer(flowFile, REL_FAILURE);
+                    return;
+                }
+
+                if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
+                    logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long",
+                        new Object[] {flowFile, rateControlAttributeName, attributeValue});
+                    session.transfer(flowFile, REL_FAILURE);
+                    return;
+                }
+                rateValue = Long.parseLong(attributeValue);
+                break;
+            default:
+                throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
+        }
+
+        final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
+        final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
+        Throttle throttle = throttleMap.get(groupName);
+        if (throttle == null) {
+            throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
+
+            final String maxRateValue = context.getProperty(MAX_RATE).getValue();
+            final long newRate;
+            if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
+                newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue();
+            } else {
+                newRate = Long.parseLong(maxRateValue);
+            }
+            throttle.setMaxRate(newRate);
+
+            throttleMap.put(groupName, throttle);
+        }
+
+        throttle.lock();
+        try {
+            if (throttle.tryAdd(rateValue)) {
+                logger.info("transferring {} to 'success'", new Object[] {flowFile});
+                session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                flowFile = session.penalize(flowFile);
+                session.transfer(flowFile);
+            }
+        } finally {
+            throttle.unlock();
+        }
+    }
+
+    private static class TimestampedLong {
+
+        private final Long value;
+        private final long timestamp = System.currentTimeMillis();
+
+        public TimestampedLong(final Long value) {
+            this.value = value;
+        }
+
+        public Long getValue() {
+            return value;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+
+    private static class RateEntityAccess implements EntityAccess<TimestampedLong> {
+
+        @Override
+        public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
+            if (oldValue == null && toAdd == null) {
+                return new TimestampedLong(0L);
+            } else if (oldValue == null) {
+                return toAdd;
+            } else if (toAdd == null) {
+                return oldValue;
+            }
+
+            return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
+        }
+
+        @Override
+        public TimestampedLong createNew() {
+            return new TimestampedLong(0L);
+        }
+
+        @Override
+        public long getTimestamp(TimestampedLong entity) {
+            return entity == null ? 0L : entity.getTimestamp();
+        }
+    }
+
+    private static class Throttle extends ReentrantLock {
+
+        private final AtomicLong maxRate = new AtomicLong(1L);
+        private final long timePeriodValue;
+        private final TimeUnit timePeriodUnit;
+        private final TimedBuffer<TimestampedLong> timedBuffer;
+        private final ProcessorLog logger;
+
+        private volatile long penalizationExpired;
+        private volatile long lastUpdateTime;
+
+        public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) {
+            this.timePeriodUnit = unit;
+            this.timePeriodValue = timePeriod;
+            this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess());
+            this.logger = logger;
+        }
+
+        public void setMaxRate(final long maxRate) {
+            this.maxRate.set(maxRate);
+        }
+
+        public long lastUpdateTime() {
+            return lastUpdateTime;
+        }
+
+        public boolean tryAdd(final long value) {
+            final long now = System.currentTimeMillis();
+            if (penalizationExpired > now) {
+                return false;
+            }
+
+            final long maxRateValue = maxRate.get();
+
+            final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit));
+            if (sum != null && sum.getValue() >= maxRateValue) {
+                logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[] {sum.getValue(), value});
+                return false;
+            }
+
+            logger.debug("current sum for throttle is {}, so allowing rate of {} through",
+                new Object[] {sum == null ? 0 : sum.getValue(), value});
+
+            final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
+            if (transferred > maxRateValue) {
+                final long amountOver = transferred - maxRateValue;
+                // determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
+                final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
+                final double pct = (double) amountOver / (double) maxRateValue;
+                final long penalizationPeriod = (long) (milliDuration * pct);
+                this.penalizationExpired = now + penalizationPeriod;
+                logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[] {value, penalizationPeriod});
+            }
+
+            lastUpdateTime = now;
+            return true;
+        }
+    }
 }