You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by martin-mucha <gi...@git.apache.org> on 2018/01/23 20:17:26 UTC

[GitHub] nifi pull request #2429: Allow delayed transfer

GitHub user martin-mucha opened a pull request:

    https://github.com/apache/nifi/pull/2429

    Allow delayed transfer

    Nifi has concept of penalization, but this penalization has fixed delay, and there isn't way how to change it dynamically. 
    
    If we want to implement retry flow, where FlowFile flows in loop, we can either lower performance of Processor via yielding it, or we can do active waiting. And this is actually recommended as a correct way how to do that.
    
    It seems, that we can easily implement better RetryProcessor, all we missing is `session.penalize` which accepts `penalizationPeriod`. Processor then can gradually prolong waiting time after each failure.
    
    ---
    Pull request providing both core changes + exemplary processor


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/martin-mucha/nifi allowDelayedTransfer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2429.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2429
    
----
commit 3fd6e4ce73e2dc9f747378ac0015f2389824968d
Author: Martin Mucha <al...@...>
Date:   2018-01-23T19:31:18Z

    NIFI-4805: allow delayed transfer, parameterize session.penalize
    
    Nifi has concept of penalization, but this penalization has fixed
    delay, and there isn't way how to change it dynamically.
    
    If we want to implement retry flow, where FlowFile flows in loop, we
    can either lower performance of Processor via yielding it, or we can
    do active waiting. And this is actually recommended as a correct way
    how to do that.
    
    It seems, that we can easily implement better RetryProcessor, all we
    missing is `session.penalize` which accepts `penalizationPeriod`.
    Processor then can gradually prolong waiting time after each failure.
    
    Signed-off-by: Martin Mucha <al...@gmail.com>

commit 013b89e6e67e622e06b03d56eb9ec9e0a616718e
Author: Martin Mucha <al...@...>
Date:   2018-01-23T20:14:15Z

    NIFI-4805: Retry waiting, LogarithmicallyPenalizeFlowFileProcessor
    
    processor, which can be used to create retry loop with logarithmical
    waiting.
    
    Signed-off-by: Martin Mucha <al...@gmail.com>

----


---

[GitHub] nifi issue #2429: Allow delayed transfer

Posted by alfonz19 <gi...@git.apache.org>.
Github user alfonz19 commented on the issue:

    https://github.com/apache/nifi/pull/2429
  
    Hi, 
    someone from NIFI explained to me before, that custom penalizaton via parameterized penalize was already present before and dropped from code on purpose. That decision makes this unnecessarily harder, considering simple task. There were lots of things like this — bugs/weird decisions etc. So expanding on that — we have not that hard usecase, and found out, that it's easier for us to implement all of it ourselves than to use NIFI, where some actions are unnecessarily hard to achieve and some decisions does not work for us. I quess it's different for bigger usecases, but ... So no, I'm not working on it, we reimplemented everything and have not interrest to continue with nifi. Sorry.


---

[GitHub] nifi pull request #2429: Allow delayed transfer

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2429#discussion_r175184191
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogarithmicallyPenalizeFlowFileProcessor.java ---
    @@ -0,0 +1,122 @@
    +package org.apache.nifi.processors.standard;
    +
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +public class LogarithmicallyPenalizeFlowFileProcessor extends AbstractProcessor {
    +
    +    private static final PropertyDescriptor INITIAL_RECONNECT_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("initialReconnectTimeout")
    +            .displayName("InitialReconnectTimeoutMs")
    +            .description("First delay length after first FlowFile arrival in millis")
    +            .defaultValue("50")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor MAXIMAL_RECONNECT_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("maximalReconnectTimeout")
    +            .displayName("MaximalReconnectTimeoutMs")
    --- End diff --
    
    We would also probably want to call this "Maximum Penalty Duration" and should set the default value to "10 mins" and use a TIME_PERIOD_VALIDATOR.


---

[GitHub] nifi pull request #2429: Allow delayed transfer

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2429#discussion_r175183514
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogarithmicallyPenalizeFlowFileProcessor.java ---
    @@ -0,0 +1,122 @@
    +package org.apache.nifi.processors.standard;
    +
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +public class LogarithmicallyPenalizeFlowFileProcessor extends AbstractProcessor {
    --- End diff --
    
    I would suggest a simpler name: PenalizeFlowFile. Generally we avoid use of the word 'Processor' in our naming conventions and the fact that it is logarithmic is probably not important in describing the main concept of the processor but is a bit of a mouthful to say :)


---

[GitHub] nifi pull request #2429: Allow delayed transfer

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2429#discussion_r175190618
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogarithmicallyPenalizeFlowFileProcessor.java ---
    @@ -0,0 +1,122 @@
    +package org.apache.nifi.processors.standard;
    +
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +public class LogarithmicallyPenalizeFlowFileProcessor extends AbstractProcessor {
    +
    +    private static final PropertyDescriptor INITIAL_RECONNECT_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("initialReconnectTimeout")
    +            .displayName("InitialReconnectTimeoutMs")
    +            .description("First delay length after first FlowFile arrival in millis")
    +            .defaultValue("50")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor MAXIMAL_RECONNECT_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("maximalReconnectTimeout")
    +            .displayName("MaximalReconnectTimeoutMs")
    +            .description("Maximal delay length after first FlowFile arrival in millis")
    +            .defaultValue(""+10*60*1000)//10 minutes
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor MAXIMAL_NUMBER_OF_RETRIES = new PropertyDescriptor.Builder()
    +            .name("maximalNumberOfRetries")
    +            .displayName("maximalNumberOfRetries")
    +            .description("Maximal number of retries to allow")
    +            .defaultValue("10")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor RETRY_COUNT_PROPERTY = new PropertyDescriptor.Builder()
    +            .name("retryCountProperty")
    +            .displayName("RetryCountProperty")
    +            .description("Name of attribute in which number of retries should be stored.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(new Validator() {
    +                @Override
    +                public ValidationResult validate(String subject, String input, ValidationContext context) {
    +                    return new ValidationResult.Builder().valid(true).build();
    +                }
    +            })
    +            .build();
    --- End diff --
    
    I would think that most people won't actually care what attribute is used here, so I would set a default value so that users don't have to bother setting it.


---

[GitHub] nifi pull request #2429: Allow delayed transfer

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2429#discussion_r175184911
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogarithmicallyPenalizeFlowFileProcessor.java ---
    @@ -0,0 +1,122 @@
    +package org.apache.nifi.processors.standard;
    +
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +public class LogarithmicallyPenalizeFlowFileProcessor extends AbstractProcessor {
    +
    +    private static final PropertyDescriptor INITIAL_RECONNECT_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("initialReconnectTimeout")
    +            .displayName("InitialReconnectTimeoutMs")
    +            .description("First delay length after first FlowFile arrival in millis")
    +            .defaultValue("50")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor MAXIMAL_RECONNECT_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("maximalReconnectTimeout")
    +            .displayName("MaximalReconnectTimeoutMs")
    +            .description("Maximal delay length after first FlowFile arrival in millis")
    +            .defaultValue(""+10*60*1000)//10 minutes
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor MAXIMAL_NUMBER_OF_RETRIES = new PropertyDescriptor.Builder()
    +            .name("maximalNumberOfRetries")
    +            .displayName("maximalNumberOfRetries")
    --- End diff --
    
    Should use a display name like "Maximum Number of Retries" (generally we tend to use the term 'Maximum' in our conventions over Maximal, though either is considered correct grammatically).


---

[GitHub] nifi issue #2429: Allow delayed transfer

Posted by patricker <gi...@git.apache.org>.
Github user patricker commented on the issue:

    https://github.com/apache/nifi/pull/2429
  
    @alfonz19 are you still working on this?


---

[GitHub] nifi pull request #2429: Allow delayed transfer

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2429#discussion_r175185179
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogarithmicallyPenalizeFlowFileProcessor.java ---
    @@ -0,0 +1,122 @@
    +package org.apache.nifi.processors.standard;
    +
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +public class LogarithmicallyPenalizeFlowFileProcessor extends AbstractProcessor {
    +
    +    private static final PropertyDescriptor INITIAL_RECONNECT_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("initialReconnectTimeout")
    +            .displayName("InitialReconnectTimeoutMs")
    +            .description("First delay length after first FlowFile arrival in millis")
    +            .defaultValue("50")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor MAXIMAL_RECONNECT_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("maximalReconnectTimeout")
    +            .displayName("MaximalReconnectTimeoutMs")
    +            .description("Maximal delay length after first FlowFile arrival in millis")
    +            .defaultValue(""+10*60*1000)//10 minutes
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor MAXIMAL_NUMBER_OF_RETRIES = new PropertyDescriptor.Builder()
    +            .name("maximalNumberOfRetries")
    +            .displayName("maximalNumberOfRetries")
    +            .description("Maximal number of retries to allow")
    +            .defaultValue("10")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor RETRY_COUNT_PROPERTY = new PropertyDescriptor.Builder()
    +            .name("retryCountProperty")
    +            .displayName("RetryCountProperty")
    --- End diff --
    
    We should use spaces between words here. Also I think the word 'property' should be 'attribute': Retry Count Attribute


---

[GitHub] nifi pull request #2429: Allow delayed transfer

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2429#discussion_r175183797
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogarithmicallyPenalizeFlowFileProcessor.java ---
    @@ -0,0 +1,122 @@
    +package org.apache.nifi.processors.standard;
    +
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +public class LogarithmicallyPenalizeFlowFileProcessor extends AbstractProcessor {
    --- End diff --
    
    We need to have documentation, etc. added here. This processor can be @EventDriven and @SideEffectFree. It should also have a @CapabilityDescription and @Tags. It should probably also document which attributes it reads & writes via @ReadsAttributes and @WritesAttributes.


---

[GitHub] nifi pull request #2429: Allow delayed transfer

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2429#discussion_r175185294
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogarithmicallyPenalizeFlowFileProcessor.java ---
    @@ -0,0 +1,122 @@
    +package org.apache.nifi.processors.standard;
    +
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +public class LogarithmicallyPenalizeFlowFileProcessor extends AbstractProcessor {
    +
    +    private static final PropertyDescriptor INITIAL_RECONNECT_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("initialReconnectTimeout")
    +            .displayName("InitialReconnectTimeoutMs")
    +            .description("First delay length after first FlowFile arrival in millis")
    +            .defaultValue("50")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor MAXIMAL_RECONNECT_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("maximalReconnectTimeout")
    +            .displayName("MaximalReconnectTimeoutMs")
    +            .description("Maximal delay length after first FlowFile arrival in millis")
    +            .defaultValue(""+10*60*1000)//10 minutes
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor MAXIMAL_NUMBER_OF_RETRIES = new PropertyDescriptor.Builder()
    +            .name("maximalNumberOfRetries")
    +            .displayName("maximalNumberOfRetries")
    +            .description("Maximal number of retries to allow")
    +            .defaultValue("10")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private static final PropertyDescriptor RETRY_COUNT_PROPERTY = new PropertyDescriptor.Builder()
    +            .name("retryCountProperty")
    +            .displayName("RetryCountProperty")
    +            .description("Name of attribute in which number of retries should be stored.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(new Validator() {
    --- End diff --
    
    We should use a NON_EMPTY_VALIDATOR here.


---

[GitHub] nifi pull request #2429: Allow delayed transfer

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2429#discussion_r175184013
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogarithmicallyPenalizeFlowFileProcessor.java ---
    @@ -0,0 +1,122 @@
    +package org.apache.nifi.processors.standard;
    +
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +public class LogarithmicallyPenalizeFlowFileProcessor extends AbstractProcessor {
    +
    +    private static final PropertyDescriptor INITIAL_RECONNECT_TIMEOUT = new PropertyDescriptor.Builder()
    +            .name("initialReconnectTimeout")
    +            .displayName("InitialReconnectTimeoutMs")
    --- End diff --
    
    We should avoid using specific time units in property names or enforcing time units. Instead, we should call this "Initial Penalty" and use StandardValidators.TIME_PERIOD_VALIDATOR with a default value of "50 ms".


---