You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Boris Tyukin <bo...@boristyukin.com> on 2019/03/12 16:52:07 UTC

set penalty duration on a flowfile

Hi guys,

Is it possible to set penalty duration for a flowfile rather than a
processor? I have a custom groovy processor and I want to set penalty
dynamically based on some criteria.

I was also looking at controlRate processor but would rather penalize
flowfiles instead with a custom delay.

Thanks!
Boris

Re: set penalty duration on a flowfile

Posted by Bryan Bende <bb...@gmail.com>.
Hello,

Even if there is a way to make this work, I would not recommend doing this.

Processors should not have access to internals of NiFI's framework
(i.e. nifi-framework-nar), they should only have access to what is
provided through the nifi-api which is the session and context.

If you start using stuff outside nifi-api, then your processors are
likely to break on upgrades because the developers are free to change
anything they want inside nifi-framework, the api is what is
guaranteed to be consistent.

-Bryan

On Wed, Apr 3, 2019 at 4:16 PM ara m. <ar...@gmail.com> wrote:
>
> I'm also trying to do this. Ive tried both in NiFi 171 and 191.
> ExecuteScript doesnt seem to be able to load StandardFlowFileRecord, which
> is strange since its part of nifi-framework-nar-*.nar which is in lib/.
> I read a post saying somebody was using ExecuteGroovyScript which had
> different imports, but that if you were using ExecuteScript you should have
> nifi classes loaded.
>
> Ideas why this is failing?
>
> I've tried even using import and adding lib/ to the modules directory of my
> ExecuteScript, ScriptEngine: Groovy.
>
> this is the body.
> import org.apache.commons.io.IOUtils
> import java.nio.charset.StandardCharsets
> import org.apache.nifi.controller.repository.*
>
> flowFile = session.get()
> if(!flowFile) return
>
> expirationEpochMillis = System.currentTimeMillis() +
> (flowFile.getAttribute('waitOnPriorsMinutes').toInteger() * 60 * 1000)
>
> newFlowFile = new
> StandardFlowFileRecord.Builder().fromFlowFile(flowFile).penaltyExpirationTime(expirationEpochMillis).build()
>
> session.penalize(newFlowFile)
> session.transfer(newFlowFile, REL_SUCCESS)
>
>
> I get the following error.
>
> Script879.groovy: 22: unable to resolve class StandardFlowFileRecord.Builder
>  @ line 22, column 15.
>    newFlowFile = new
> StandardFlowFileRecord.Builder().fromFlowFile(flowFile).penaltyExpirationTime(expirationEpochMillis).build()
>
>
>
>
>
> --
> Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/

Re: set penalty duration on a flowfile

Posted by "ara m." <ar...@gmail.com>.
I'm also trying to do this. Ive tried both in NiFi 171 and 191.
ExecuteScript doesnt seem to be able to load StandardFlowFileRecord, which
is strange since its part of nifi-framework-nar-*.nar which is in lib/. 
I read a post saying somebody was using ExecuteGroovyScript which had
different imports, but that if you were using ExecuteScript you should have
nifi classes loaded. 

Ideas why this is failing? 

I've tried even using import and adding lib/ to the modules directory of my
ExecuteScript, ScriptEngine: Groovy. 

this is the body. 
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.repository.*

flowFile = session.get()
if(!flowFile) return

expirationEpochMillis = System.currentTimeMillis() +
(flowFile.getAttribute('waitOnPriorsMinutes').toInteger() * 60 * 1000)

newFlowFile = new
StandardFlowFileRecord.Builder().fromFlowFile(flowFile).penaltyExpirationTime(expirationEpochMillis).build()

session.penalize(newFlowFile)
session.transfer(newFlowFile, REL_SUCCESS)


I get the following error. 

Script879.groovy: 22: unable to resolve class StandardFlowFileRecord.Builder
 @ line 22, column 15.
   newFlowFile = new
StandardFlowFileRecord.Builder().fromFlowFile(flowFile).penaltyExpirationTime(expirationEpochMillis).build()





--
Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/

Re: set penalty duration on a flowfile

Posted by Boris Tyukin <bo...@boristyukin.com>.
nice, thanks Andy!

On Tue, Mar 12, 2019 at 3:56 PM Andy LoPresto <al...@apache.org> wrote:

> The code that currently penalizes a flowfile is below [1]. It reads
> directly from the context containing the processor’s configured (static)
> penalty duration. You could manually perform these steps (using your
> dynamic value) in your Groovy script instead. Then just pass the output to
> session.transfer(<penalized flowFile>, REL_FAILURE) as you would normally.
>
> final StandardRepositoryRecord record = getRecord(flowFile);
>         final long expirationEpochMillis = System.currentTimeMillis() +
> context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
>         final FlowFileRecord newFile = new
> StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
>
> [1]
> https://github.com/apache/nifi/blob/b508d6bfbc2b6a68fe2ab152de96611f06a8724a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java#L1793-L1795
>
> Andy LoPresto
> alopresto@apache.org
> *alopresto.apache@gmail.com <al...@gmail.com>*
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
> On Mar 12, 2019, at 9:52 AM, Boris Tyukin <bo...@boristyukin.com> wrote:
>
> Hi guys,
>
> Is it possible to set penalty duration for a flowfile rather than a
> processor? I have a custom groovy processor and I want to set penalty
> dynamically based on some criteria.
>
> I was also looking at controlRate processor but would rather penalize
> flowfiles instead with a custom delay.
>
> Thanks!
> Boris
>
>
>

Re: set penalty duration on a flowfile

Posted by Andy LoPresto <al...@apache.org>.
The code that currently penalizes a flowfile is below [1]. It reads directly from the context containing the processor’s configured (static) penalty duration. You could manually perform these steps (using your dynamic value) in your Groovy script instead. Then just pass the output to session.transfer(<penalized flowFile>, REL_FAILURE) as you would normally. 

	final StandardRepositoryRecord record = getRecord(flowFile);
        final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();

[1] https://github.com/apache/nifi/blob/b508d6bfbc2b6a68fe2ab152de96611f06a8724a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java#L1793-L1795 <https://github.com/apache/nifi/blob/b508d6bfbc2b6a68fe2ab152de96611f06a8724a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java#L1793-L1795>

Andy LoPresto
alopresto@apache.org
alopresto.apache@gmail.com
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Mar 12, 2019, at 9:52 AM, Boris Tyukin <bo...@boristyukin.com> wrote:
> 
> Hi guys,
> 
> Is it possible to set penalty duration for a flowfile rather than a processor? I have a custom groovy processor and I want to set penalty dynamically based on some criteria.
> 
> I was also looking at controlRate processor but would rather penalize flowfiles instead with a custom delay.
> 
> Thanks!
> Boris