You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/07 23:53:36 UTC
[16/17] nifi git commit: NIFI-810: Addressed several checkstyle
violations
NIFI-810: Addressed several checkstyle violations
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ccfb57fe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ccfb57fe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ccfb57fe
Branch: refs/heads/NIFI-810-InputRequirement
Commit: ccfb57fe9ff43f11319dcb1625bfc78b1d88f56a
Parents: b974445
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 7 17:48:51 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 7 17:48:51 2015 -0400
----------------------------------------------------------------------
.../annotation/behavior/InputRequirement.java | 70 +-
.../nifi/processors/aws/s3/PutS3Object.java | 46 +-
.../apache/nifi/controller/ProcessorNode.java | 88 +--
.../nifi/controller/StandardProcessorNode.java | 10 +-
.../standard/Base64EncodeContent.java | 168 ++---
.../nifi/processors/standard/ControlRate.java | 672 +++++++++----------
6 files changed, 534 insertions(+), 520 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
index 97e6b88..13f442c 100644
--- a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
+++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.annotation.behavior;
import java.lang.annotation.Documented;
@@ -21,31 +37,31 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface InputRequirement {
- Requirement value();
-
- public static enum Requirement {
- /**
- * This value is used to indicate that the Processor requires input from other Processors
- * in order to run. As a result, the Processor will not be valid if it does not have any
- * incoming connections.
- */
- INPUT_REQUIRED,
-
- /**
- * This value is used to indicate that the Processor will consume data from an incoming
- * connection but does not require an incoming connection in order to perform its task.
- * If the {@link InputRequirement} annotation is not present, this is the default value
- * that is used.
- */
- INPUT_ALLOWED,
-
- /**
- * This value is used to indicate that the Processor is a "Source Processor" and does
- * not accept incoming connections. Because the Processor does not pull FlowFiles from
- * an incoming connection, it can be very confusing for users who create incoming connections
- * to the Processor. As a result, this value can be used in order to clarify that incoming
- * connections will not be used. This prevents the user from even creating such a connection.
- */
- INPUT_FORBIDDEN;
- }
+ Requirement value();
+
+ public static enum Requirement {
+ /**
+ * This value is used to indicate that the Processor requires input from other Processors
+ * in order to run. As a result, the Processor will not be valid if it does not have any
+ * incoming connections.
+ */
+ INPUT_REQUIRED,
+
+ /**
+ * This value is used to indicate that the Processor will consume data from an incoming
+ * connection but does not require an incoming connection in order to perform its task.
+ * If the {@link InputRequirement} annotation is not present, this is the default value
+ * that is used.
+ */
+ INPUT_ALLOWED,
+
+ /**
+ * This value is used to indicate that the Processor is a "Source Processor" and does
+ * not accept incoming connections. Because the Processor does not pull FlowFiles from
+ * an incoming connection, it can be very confusing for users who create incoming connections
+ * to the Processor. As a result, this value can be used in order to clarify that incoming
+ * connections will not be used. This prevents the user from even creating such a connection.
+ */
+ INPUT_FORBIDDEN;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 7398c4e..c7212f5 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -59,10 +59,8 @@ import com.amazonaws.services.s3.model.StorageClass;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
-@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
- value = "The value of a User-Defined Metadata field to add to the S3 Object",
- description = "Allows user-defined metadata to be added to the S3 object as key/value pairs",
- supportsExpressionLanguage = true)
+@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object",
+ description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
@WritesAttributes({
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
@@ -72,22 +70,22 @@ import com.amazonaws.services.s3.model.StorageClass;
public class PutS3Object extends AbstractS3Processor {
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
- .name("Expiration Time Rule")
- .required(false)
- .expressionLanguageSupported(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("Expiration Time Rule")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
- .name("Storage Class")
- .required(true)
- .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
- .defaultValue(StorageClass.Standard.name())
- .build();
+ .name("Storage Class")
+ .required(true)
+ .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
+ .defaultValue(StorageClass.Standard.name())
+ .build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
- FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+ Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
+ FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -97,15 +95,15 @@ public class PutS3Object extends AbstractS3Processor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .dynamic(true)
- .build();
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .dynamic(true)
+ .build();
}
@Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
@@ -176,9 +174,9 @@ public class PutS3Object extends AbstractS3Processor {
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);
- getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis});
+ getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
} catch (final ProcessException | AmazonClientException pe) {
- getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe});
+ getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 2f72d0f..d340c77 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -31,72 +31,72 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable {
- public ProcessorNode(final Processor processor, final String id,
- final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
- super(processor, id, validationContextFactory, serviceProvider);
- }
+ public ProcessorNode(final Processor processor, final String id,
+ final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
+ super(processor, id, validationContextFactory, serviceProvider);
+ }
- public abstract boolean isIsolated();
+ public abstract boolean isIsolated();
- public abstract boolean isTriggerWhenAnyDestinationAvailable();
+ public abstract boolean isTriggerWhenAnyDestinationAvailable();
- @Override
- public abstract boolean isSideEffectFree();
+ @Override
+ public abstract boolean isSideEffectFree();
- public abstract boolean isTriggeredSerially();
+ public abstract boolean isTriggeredSerially();
- public abstract boolean isEventDrivenSupported();
+ public abstract boolean isEventDrivenSupported();
- public abstract boolean isHighThroughputSupported();
+ public abstract boolean isHighThroughputSupported();
- public abstract Requirement getInputRequirement();
+ public abstract Requirement getInputRequirement();
- @Override
- public abstract boolean isValid();
+ @Override
+ public abstract boolean isValid();
- public abstract void setScheduledState(ScheduledState scheduledState);
+ public abstract void setScheduledState(ScheduledState scheduledState);
- public abstract void setBulletinLevel(LogLevel bulletinLevel);
+ public abstract void setBulletinLevel(LogLevel bulletinLevel);
- public abstract LogLevel getBulletinLevel();
+ public abstract LogLevel getBulletinLevel();
- public abstract Processor getProcessor();
+ public abstract Processor getProcessor();
- public abstract void yield(long period, TimeUnit timeUnit);
+ public abstract void yield(long period, TimeUnit timeUnit);
- public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships);
+ public abstract void setAutoTerminatedRelationships(Set<Relationship> relationships);
- public abstract Set<Relationship> getAutoTerminatedRelationships();
+ public abstract Set<Relationship> getAutoTerminatedRelationships();
- public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
+ public abstract void setSchedulingStrategy(SchedulingStrategy schedulingStrategy);
- @Override
- public abstract SchedulingStrategy getSchedulingStrategy();
+ @Override
+ public abstract SchedulingStrategy getSchedulingStrategy();
- public abstract void setRunDuration(long duration, TimeUnit timeUnit);
+ public abstract void setRunDuration(long duration, TimeUnit timeUnit);
- public abstract long getRunDuration(TimeUnit timeUnit);
+ public abstract long getRunDuration(TimeUnit timeUnit);
- public abstract Map<String, String> getStyle();
+ public abstract Map<String, String> getStyle();
- public abstract void setStyle(Map<String, String> style);
+ public abstract void setStyle(Map<String, String> style);
- /**
- * @return the number of threads (concurrent tasks) currently being used by
- * this Processor
- */
- public abstract int getActiveThreadCount();
+ /**
+ * @return the number of threads (concurrent tasks) currently being used by
+ * this Processor
+ */
+ public abstract int getActiveThreadCount();
- /**
- * Verifies that this Processor can be started if the provided set of
- * services are enabled. This is introduced because we need to verify that
- * all components can be started before starting any of them. In order to do
- * that, we need to know that this component can be started if the given
- * services are enabled, as we will then enable the given services before
- * starting this component.
- *
- * @param ignoredReferences to ignore
- */
- public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
+ /**
+ * Verifies that this Processor can be started if the provided set of
+ * services are enabled. This is introduced because we need to verify that
+ * all components can be started before starting any of them. In order to do
+ * that, we need to know that this component can be started if the given
+ * services are enabled, as we will then enable the given services before
+ * starting this component.
+ *
+ * @param ignoredReferences to ignore
+ */
+ public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index f69c510..ad22c6d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1306,9 +1306,9 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
@Override
- public void verifyModifiable() throws IllegalStateException {
- if (isRunning()) {
- throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
- }
- }>>>>>>>2215 bc848b7db395b2ca9ac7cc4dc10891393721
+ public void verifyModifiable() throws IllegalStateException {
+ if (isRunning()) {
+ throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
index 816b407..db45109 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
@@ -56,99 +56,99 @@ import org.apache.nifi.util.StopWatch;
@InputRequirement(Requirement.INPUT_REQUIRED)
public class Base64EncodeContent extends AbstractProcessor {
- public static final String ENCODE_MODE = "Encode";
- public static final String DECODE_MODE = "Decode";
+ public static final String ENCODE_MODE = "Encode";
+ public static final String DECODE_MODE = "Decode";
- public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
- .name("Mode")
- .description("Specifies whether the content should be encoded or decoded")
- .required(true)
- .allowableValues(ENCODE_MODE, DECODE_MODE)
- .defaultValue(ENCODE_MODE)
- .build();
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("Any FlowFile that is successfully encoded or decoded will be routed to success")
- .build();
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
- .build();
+ public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+ .name("Mode")
+ .description("Specifies whether the content should be encoded or decoded")
+ .required(true)
+ .allowableValues(ENCODE_MODE, DECODE_MODE)
+ .defaultValue(ENCODE_MODE)
+ .build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Any FlowFile that is successfully encoded or decoded will be routed to success")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
+ .build();
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(MODE);
- this.properties = Collections.unmodifiableList(properties);
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(MODE);
+ this.properties = Collections.unmodifiableList(properties);
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
- final ProcessorLog logger = getLogger();
+ final ProcessorLog logger = getLogger();
- boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
- try {
- final StopWatch stopWatch = new StopWatch(true);
- if (encode) {
- flowFile = session.write(flowFile, new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out) throws IOException {
- try (Base64OutputStream bos = new Base64OutputStream(out)) {
- int len = -1;
- byte[] buf = new byte[8192];
- while ((len = in.read(buf)) > 0) {
- bos.write(buf, 0, len);
- }
- bos.flush();
- }
- }
- });
- } else {
- flowFile = session.write(flowFile, new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out) throws IOException {
- try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
- int len = -1;
- byte[] buf = new byte[8192];
- while ((len = bis.read(buf)) > 0) {
- out.write(buf, 0, len);
- }
- out.flush();
- }
- }
- });
- }
+ boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+ try {
+ final StopWatch stopWatch = new StopWatch(true);
+ if (encode) {
+ flowFile = session.write(flowFile, new StreamCallback() {
+ @Override
+ public void process(InputStream in, OutputStream out) throws IOException {
+ try (Base64OutputStream bos = new Base64OutputStream(out)) {
+ int len = -1;
+ byte[] buf = new byte[8192];
+ while ((len = in.read(buf)) > 0) {
+ bos.write(buf, 0, len);
+ }
+ bos.flush();
+ }
+ }
+ });
+ } else {
+ flowFile = session.write(flowFile, new StreamCallback() {
+ @Override
+ public void process(InputStream in, OutputStream out) throws IOException {
+ try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
+ int len = -1;
+ byte[] buf = new byte[8192];
+ while ((len = bis.read(buf)) > 0) {
+ out.write(buf, 0, len);
+ }
+ out.flush();
+ }
+ }
+ });
+ }
- logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
- session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(flowFile, REL_SUCCESS);
- } catch (ProcessException e) {
- logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
- }
- }
+ logger.info("Successfully {} {}", new Object[] {encode ? "encoded" : "decoded", flowFile});
+ session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (ProcessException e) {
+ logger.error("Failed to {} {} due to {}", new Object[] {encode ? "encode" : "decode", flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index a45c211..0847472 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -61,340 +61,340 @@ import org.apache.nifi.util.timebuffer.TimedBuffer;
@CapabilityDescription("Controls the rate at which data is transferred to follow-on processors.")
public class ControlRate extends AbstractProcessor {
- public static final String DATA_RATE = "data rate";
- public static final String FLOWFILE_RATE = "flowfile count";
- public static final String ATTRIBUTE_RATE = "attribute value";
-
- public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
- .name("Rate Control Criteria")
- .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
- .required(true)
- .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
- .defaultValue(DATA_RATE)
- .build();
- public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
- .name("Maximum Rate")
- .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
- + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
- .build();
- public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
- .name("Rate Controlled Attribute")
- .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
- + "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. "
- + "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(false)
- .build();
- public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
- .name("Time Duration")
- .description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.")
- .required(true)
- .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
- .defaultValue("1 min")
- .build();
- public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
- .name("Grouping Attribute")
- .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for "
- + "each value specified by the attribute with this name. Changing this value resets the rate counters.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(false)
- .build();
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All FlowFiles are transferred to this relationship")
- .build();
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format")
- .build();
-
- private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
- private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
-
- private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
- private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(RATE_CONTROL_CRITERIA);
- properties.add(MAX_RATE);
- properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
- properties.add(TIME_PERIOD);
- properties.add(GROUPING_ATTRIBUTE_NAME);
- this.properties = Collections.unmodifiableList(properties);
-
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(final ValidationContext context) {
- final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
-
- final Validator rateValidator;
- switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
- case DATA_RATE:
- rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
- break;
- case ATTRIBUTE_RATE:
- rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
- final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
- if (rateAttr == null) {
- validationResults.add(new ValidationResult.Builder()
- .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
- .explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
- .build());
- }
- break;
- case FLOWFILE_RATE:
- default:
- rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
- break;
- }
-
- final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
- if (!rateResult.isValid()) {
- validationResults.add(rateResult);
- }
-
- return validationResults;
- }
-
- @Override
- public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
- super.onPropertyModified(descriptor, oldValue, newValue);
-
- if (descriptor.equals(RATE_CONTROL_CRITERIA)
- || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
- || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
- || descriptor.equals(TIME_PERIOD)) {
- // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
- throttleMap.clear();
- } else if (descriptor.equals(MAX_RATE)) {
- final long newRate;
- if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
- newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
- } else {
- newRate = Long.parseLong(newValue);
- }
-
- for (final Throttle throttle : throttleMap.values()) {
- throttle.setMaxRate(newRate);
- }
- }
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final long lastClearTime = lastThrottleClearTime.get();
- final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
- if (lastClearTime < throttleExpirationMillis) {
- if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
- final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
- while (itr.hasNext()) {
- final Map.Entry<String, Throttle> entry = itr.next();
- final Throttle throttle = entry.getValue();
- if (throttle.tryLock()) {
- try {
- if (throttle.lastUpdateTime() < lastClearTime) {
- itr.remove();
- }
- } finally {
- throttle.unlock();
- }
- }
- }
- }
- }
-
- // TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final ProcessorLog logger = getLogger();
- final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
- final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
- long rateValue;
- switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
- case DATA_RATE:
- rateValue = flowFile.getSize();
- break;
- case FLOWFILE_RATE:
- rateValue = 1;
- break;
- case ATTRIBUTE_RATE:
- final String attributeValue = flowFile.getAttribute(rateControlAttributeName);
- if (attributeValue == null) {
- logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[]{flowFile, rateControlAttributeName});
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
-
- if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
- logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long",
- new Object[]{flowFile, rateControlAttributeName, attributeValue});
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- rateValue = Long.parseLong(attributeValue);
- break;
- default:
- throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
- }
-
- final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
- final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
- Throttle throttle = throttleMap.get(groupName);
- if (throttle == null) {
- throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
-
- final String maxRateValue = context.getProperty(MAX_RATE).getValue();
- final long newRate;
- if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
- newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue();
- } else {
- newRate = Long.parseLong(maxRateValue);
- }
- throttle.setMaxRate(newRate);
-
- throttleMap.put(groupName, throttle);
- }
-
- throttle.lock();
- try {
- if (throttle.tryAdd(rateValue)) {
- logger.info("transferring {} to 'success'", new Object[]{flowFile});
- session.transfer(flowFile, REL_SUCCESS);
- } else {
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile);
- }
- } finally {
- throttle.unlock();
- }
- }
-
- private static class TimestampedLong {
-
- private final Long value;
- private final long timestamp = System.currentTimeMillis();
-
- public TimestampedLong(final Long value) {
- this.value = value;
- }
-
- public Long getValue() {
- return value;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
- }
-
- private static class RateEntityAccess implements EntityAccess<TimestampedLong> {
-
- @Override
- public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
- if (oldValue == null && toAdd == null) {
- return new TimestampedLong(0L);
- } else if (oldValue == null) {
- return toAdd;
- } else if (toAdd == null) {
- return oldValue;
- }
-
- return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
- }
-
- @Override
- public TimestampedLong createNew() {
- return new TimestampedLong(0L);
- }
-
- @Override
- public long getTimestamp(TimestampedLong entity) {
- return entity == null ? 0L : entity.getTimestamp();
- }
- }
-
- private static class Throttle extends ReentrantLock {
-
- private final AtomicLong maxRate = new AtomicLong(1L);
- private final long timePeriodValue;
- private final TimeUnit timePeriodUnit;
- private final TimedBuffer<TimestampedLong> timedBuffer;
- private final ProcessorLog logger;
-
- private volatile long penalizationExpired;
- private volatile long lastUpdateTime;
-
- public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) {
- this.timePeriodUnit = unit;
- this.timePeriodValue = timePeriod;
- this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess());
- this.logger = logger;
- }
-
- public void setMaxRate(final long maxRate) {
- this.maxRate.set(maxRate);
- }
-
- public long lastUpdateTime() {
- return lastUpdateTime;
- }
-
- public boolean tryAdd(final long value) {
- final long now = System.currentTimeMillis();
- if (penalizationExpired > now) {
- return false;
- }
-
- final long maxRateValue = maxRate.get();
-
- final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit));
- if (sum != null && sum.getValue() >= maxRateValue) {
- logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[]{sum.getValue(), value});
- return false;
- }
-
- logger.debug("current sum for throttle is {}, so allowing rate of {} through",
- new Object[]{sum == null ? 0 : sum.getValue(), value});
-
- final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
- if (transferred > maxRateValue) {
- final long amountOver = transferred - maxRateValue;
- // determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
- final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
- final double pct = (double) amountOver / (double) maxRateValue;
- final long penalizationPeriod = (long) (milliDuration * pct);
- this.penalizationExpired = now + penalizationPeriod;
- logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
- }
-
- lastUpdateTime = now;
- return true;
- }
- }
+ public static final String DATA_RATE = "data rate";
+ public static final String FLOWFILE_RATE = "flowfile count";
+ public static final String ATTRIBUTE_RATE = "attribute value";
+
+ public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
+ .name("Rate Control Criteria")
+ .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
+ .required(true)
+ .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
+ .defaultValue(DATA_RATE)
+ .build();
+ public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
+ .name("Maximum Rate")
+ .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+ + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
+ .build();
+ public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+ .name("Rate Controlled Attribute")
+ .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
+ + "The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. "
+ + "This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+ public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
+ .name("Time Duration")
+ .description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.")
+ .required(true)
+ .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+ .defaultValue("1 min")
+ .build();
+ public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+ .name("Grouping Attribute")
+ .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for "
+ + "each value specified by the attribute with this name. Changing this value resets the rate counters.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles are transferred to this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format")
+ .build();
+
+ private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
+ private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
+
+ private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+ private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RATE_CONTROL_CRITERIA);
+ properties.add(MAX_RATE);
+ properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
+ properties.add(TIME_PERIOD);
+ properties.add(GROUPING_ATTRIBUTE_NAME);
+ this.properties = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+ final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
+
+ final Validator rateValidator;
+ switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+ case DATA_RATE:
+ rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
+ break;
+ case ATTRIBUTE_RATE:
+ rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+ final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+ if (rateAttr == null) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
+ .explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
+ .build());
+ }
+ break;
+ case FLOWFILE_RATE:
+ default:
+ rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+ break;
+ }
+
+ final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
+ if (!rateResult.isValid()) {
+ validationResults.add(rateResult);
+ }
+
+ return validationResults;
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ super.onPropertyModified(descriptor, oldValue, newValue);
+
+ if (descriptor.equals(RATE_CONTROL_CRITERIA)
+ || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
+ || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
+ || descriptor.equals(TIME_PERIOD)) {
+ // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
+ throttleMap.clear();
+ } else if (descriptor.equals(MAX_RATE)) {
+ final long newRate;
+ if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
+ newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
+ } else {
+ newRate = Long.parseLong(newValue);
+ }
+
+ for (final Throttle throttle : throttleMap.values()) {
+ throttle.setMaxRate(newRate);
+ }
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final long lastClearTime = lastThrottleClearTime.get();
+ final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+ if (lastClearTime < throttleExpirationMillis) {
+ if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
+ final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
+ while (itr.hasNext()) {
+ final Map.Entry<String, Throttle> entry = itr.next();
+ final Throttle throttle = entry.getValue();
+ if (throttle.tryLock()) {
+ try {
+ if (throttle.lastUpdateTime() < lastClearTime) {
+ itr.remove();
+ }
+ } finally {
+ throttle.unlock();
+ }
+ }
+ }
+ }
+ }
+
+ // TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final ProcessorLog logger = getLogger();
+ final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
+ final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+ long rateValue;
+ switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+ case DATA_RATE:
+ rateValue = flowFile.getSize();
+ break;
+ case FLOWFILE_RATE:
+ rateValue = 1;
+ break;
+ case ATTRIBUTE_RATE:
+ final String attributeValue = flowFile.getAttribute(rateControlAttributeName);
+ if (attributeValue == null) {
+ logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", new Object[] {flowFile, rateControlAttributeName});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
+ logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive long",
+ new Object[] {flowFile, rateControlAttributeName, attributeValue});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ rateValue = Long.parseLong(attributeValue);
+ break;
+ default:
+ throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
+ }
+
+ final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
+ final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
+ Throttle throttle = throttleMap.get(groupName);
+ if (throttle == null) {
+ throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
+
+ final String maxRateValue = context.getProperty(MAX_RATE).getValue();
+ final long newRate;
+ if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
+ newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue();
+ } else {
+ newRate = Long.parseLong(maxRateValue);
+ }
+ throttle.setMaxRate(newRate);
+
+ throttleMap.put(groupName, throttle);
+ }
+
+ throttle.lock();
+ try {
+ if (throttle.tryAdd(rateValue)) {
+ logger.info("transferring {} to 'success'", new Object[] {flowFile});
+ session.transfer(flowFile, REL_SUCCESS);
+ } else {
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile);
+ }
+ } finally {
+ throttle.unlock();
+ }
+ }
+
+ private static class TimestampedLong {
+
+ private final Long value;
+ private final long timestamp = System.currentTimeMillis();
+
+ public TimestampedLong(final Long value) {
+ this.value = value;
+ }
+
+ public Long getValue() {
+ return value;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+ }
+
+ private static class RateEntityAccess implements EntityAccess<TimestampedLong> {
+
+ @Override
+ public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) {
+ if (oldValue == null && toAdd == null) {
+ return new TimestampedLong(0L);
+ } else if (oldValue == null) {
+ return toAdd;
+ } else if (toAdd == null) {
+ return oldValue;
+ }
+
+ return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
+ }
+
+ @Override
+ public TimestampedLong createNew() {
+ return new TimestampedLong(0L);
+ }
+
+ @Override
+ public long getTimestamp(TimestampedLong entity) {
+ return entity == null ? 0L : entity.getTimestamp();
+ }
+ }
+
+ private static class Throttle extends ReentrantLock {
+
+ private final AtomicLong maxRate = new AtomicLong(1L);
+ private final long timePeriodValue;
+ private final TimeUnit timePeriodUnit;
+ private final TimedBuffer<TimestampedLong> timedBuffer;
+ private final ProcessorLog logger;
+
+ private volatile long penalizationExpired;
+ private volatile long lastUpdateTime;
+
+ public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) {
+ this.timePeriodUnit = unit;
+ this.timePeriodValue = timePeriod;
+ this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess());
+ this.logger = logger;
+ }
+
+ public void setMaxRate(final long maxRate) {
+ this.maxRate.set(maxRate);
+ }
+
+ public long lastUpdateTime() {
+ return lastUpdateTime;
+ }
+
+ public boolean tryAdd(final long value) {
+ final long now = System.currentTimeMillis();
+ if (penalizationExpired > now) {
+ return false;
+ }
+
+ final long maxRateValue = maxRate.get();
+
+ final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit));
+ if (sum != null && sum.getValue() >= maxRateValue) {
+ logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[] {sum.getValue(), value});
+ return false;
+ }
+
+ logger.debug("current sum for throttle is {}, so allowing rate of {} through",
+ new Object[] {sum == null ? 0 : sum.getValue(), value});
+
+ final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
+ if (transferred > maxRateValue) {
+ final long amountOver = transferred - maxRateValue;
+ // determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long
+ final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
+ final double pct = (double) amountOver / (double) maxRateValue;
+ final long penalizationPeriod = (long) (milliDuration * pct);
+ this.penalizationExpired = now + penalizationPeriod;
+ logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[] {value, penalizationPeriod});
+ }
+
+ lastUpdateTime = now;
+ return true;
+ }
+ }
}