You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by DAVID SMITH <da...@btinternet.com> on 2015/08/15 11:04:54 UTC

Writing to a flowfile

Hi

I'm writing a processor which parses a file, I want the parsed file to go to relationship parsed, and the original file to go to relationship original, if the parse was ok.
If the parse fails I want the original file to go to relationship failure.

I have an inner class which contains a callback which does the parsing. The callback is called from the onTrigger method.
My problem is that I want to read from my original flowFile and write to a new flowFile, but it always seems to write to the original flowfile.
How do I direct my bufferedwriter to my new flowfile?

Many thanks
Dave

Sent from Yahoo! Mail on Android


RE: Writing to a flowfile

Posted by Mark Payne <ma...@hotmail.com>.
Dave,

Not a problem. I do remember a note from you about making those processors able to communicate with AMQP and wanting to contribute that back, which is great! I don't remember seeing a link to the code, though. I will have to go back and check again.

If I can find that link, I can try to give some more specific advice, but in general when I create a processor that interacts with an external endpoint, I will create a method that returns the client that I am using. It's often something super simple like:

protected Client getClient() {
    return client;
}

This way, in my unit test I can simply create a subclass that is able to returned a mocked out client:

final TestRunner runner = TestRunners.newTestRunner(new MyProcessor() {
    protected Client getClient() {
        return new MockClient();
    }
});

This way, I can easily mock the client out to do whatever I need, including things like throwing IOException to ensure that it is handled properly.

Again, I'll try to find that link and offer more specific pointers if I can - if you have the link handy that would be good, in case I'm not able to find the link
that you sent last.

Thanks
-Mark

----------------------------------------
> Date: Sun, 16 Aug 2015 19:01:28 +0000
> From: davidrsmith@btinternet.com
> To: dev@nifi.apache.org
> Subject: Re: Writing to a flowfile
>
> Mark
> Thanks very much for all of your help, that works really well, I have also taken on board your other comments and implemented them on my home version. I will use it all at work tomorrow.
> As you may have seen on a post I made in July, I have taken the put & get JMS processors and made a modified version for using with an AMQP broker. They appear to work well and my boss (John Thorp) would like me to contribute them back to org.apache.nifi.
> Before I can do that I need to write some Junit tests, but I have no idea how to mock an AMQP broker/queue.To contribute the code for consideration do I need to create my own branch in the development code, insert my code and then push it back up. Currently my code is on github (link in July posts) .
> Thanks again for your helpDave
>
>
> On Saturday, 15 August 2015, 22:39, Mark Payne <ma...@hotmail.com> wrote:
>
>
> Dave,
>
> Not a problem.
>
> The FlowFile object itself is immutable. If you want to modify the FlowFile, you do so by asking
> the session to give you a new version of the FlowFile with some update. For instance, by adding
> an attribute or changing the content of the FlowFile.
>
> So any call to session.putAttribute or session.write returns a new FlowFile. If you update
> the line that calls putAttribute so that it stores the returned FlowFile into your 'parsed' variable,
> you should be good to go.
>
> So you would do:
>
> FlowFile parsed = session.create(original);
> parsed = session.putAttribute(parsed, CoreAttributes.FILENAME.key(), context.getProperty(PARSED_FILENAME).getValue());
>
> Otherwise, you end up trying to modify the same version twice (once when you call session.putAttribute and
> again when you call session.write). This is what the message is complaining about.
>
> Just looking through the code, a few other comments that I would offer:
>
> * the "static boolean error = false;" is likely to cause problems. All instances of your processor would get the same 'error' variable.
> I would recommend you use an org.apache.nifi.util.BooleanHolder object (defined in the nifi-utils module) and define
> it within your onTrigger method, rather than using a member variable.
>
> * Experience has shown that with any log message, you should log the FlowFIle that you are referring to. You can
> also parameterize your log messages. For example:
>
> logger.error("Failed to parse {}; routing to failure", new Object[] {original});
>
> rather than
>
> logger.error("parsing to failure");
>
>
> I hope this helps! Let us know if you're still having problems!
>
> Thanks
> -Mark
>
> ________________________________
>> Date: Sat, 15 Aug 2015 19:41:00 +0000
>> From: davidrsmith@btinternet.com
>> To: dev@nifi.apache.org
>> Subject: Re: Writing to a flowfile
>>
>> Mark
>>
>> Thanks for your help. I have used the snippet of code you sent and it
>> works although I am fairly sure I haven't implemented it correctly, I
>> have had to put all of my code in the OnTrigger method, instead of in
>> the the callback.
>> I also need to change the filename attribute of the parsed flowfile, I
>> have inserted the following line:
>>
>> session.putAttribute(parsed, CoreAttributes.FILENAME.key(),
>> context.getProperty(PARSED_FILENAME).getValue());
>>
>> But it gives me the following error:
>> 2015-08-15 21:28:55,628 ERROR [Timer-Driven Process Thread-5]
>> o.a.nifi.processors.standard.ParseMyData
>> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b]
>> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] failed to process
>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
>> is not the most recent version of this FlowFile within this session
>> (StandardProcessSession[id=21562]); rolling back session:
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
>> is not the most recent version of this FlowFile within this session
>> (StandardProcessSession[id=21562])
>>
>>
>> I have attached my processor class, I would be grateful if you could
>> give it a quick look and tell me what I have done wrong.
>>
>> Many thanks
>> Dave
>>
>>
>>
>> On Saturday, 15 August 2015, 13:16, Mark Payne <ma...@hotmail.com> wrote:
>>
>>
>> David,
>>
>> In this case, since you want to keep the original intact, you will need
>> to create a 'child' flowfile to write to.
>> You do this with ProcessSession.create(FlowFile)
>>
>> So you will have code that looks something like this:
>>
>> final FlowFile original = session.get();
>> if (original == null) {
>> return;
>> }
>>
>> // create a new 'child' FlowFile. The framework will automatically handle
>> // the provenance information so that 'parsed' is forked from 'original'.
>> FlowFile parsed = session.create(original);
>>
>> // Get an OutputStream for the 'parsed' FlowFile
>> parsed = session.write(parsed, new OutputStreamCallback() {
>> public void process(OutputStream parsedOut) {
>>
>> // Get an InputStream for the original
>> session.read(original, new InputStreamCallback() {
>> public void process(InputStream originalIn) {
>> // read from original FlowFile via originalIn
>> // write to new FlowFile via parsedOut
>> }
>> });
>>
>> }
>> });
>>
>> Does this give you what you need? If anything is still unclear, let us know!
>>
>> Thanks
>> -Mark
>>
>> ----------------------------------------
>>> Date: Sat, 15 Aug 2015 10:04:54 +0100
>>> From: davidrsmith@btinternet.com<ma...@btinternet.com>
>>> Subject: Writing to a flowfile
>>> To: dev@nifi.apache.org<ma...@nifi.apache.org>
>>>
>>>
>>> Hi
>>>
>>> I'm writing a processor which parses a file, I want the parsed file
>> to go to relationship parsed, and the original file to go to
>> relationship original, if the parse was ok.
>>> If the parse fails I want the original file to go to relationship failure.
>>>
>>> I have an inner class which contains a callback which does the
>> parsing. The callback is called from the onTrigger method.
>>> My problem is that I want to read from my original flowFile and write
>> to a new flowFile, but it always seems to write to the original
>> flowfile.
>>> How do I direct my bufferedwriter to my new flowfile?
>>>
>>> Many thanks
>>> Dave
>>>
>>> Sent from Yahoo! Mail on Android
>>>
>>
>>
>>
>
>
>
 		 	   		  

Re: Writing to a flowfile

Posted by DAVID SMITH <da...@btinternet.com>.
Mark
Thanks very much for all of your help, that works really well, I have also taken on board your other comments and implemented them on my home version.  I will use it all at work tomorrow.
As you may have seen on a post I made in July, I have taken the put & get JMS processors and made a modified version for using with an AMQP broker. They appear to work well and my boss (John Thorp) would like me to contribute them back to org.apache.nifi.
Before I can do that I need to write some Junit tests, but I have no idea how to mock an AMQP broker/queue.To contribute the code for consideration do I need to create my own branch in the development code, insert my code and then push it back up. Currently my code is on github (link in July posts) .
Thanks again for your helpDave 


     On Saturday, 15 August 2015, 22:39, Mark Payne <ma...@hotmail.com> wrote:
   

 Dave,

Not a problem.

The FlowFile object itself is immutable. If you want to modify the FlowFile, you do so by asking 
the session to give you a new version of the FlowFile with some update. For instance, by adding 
an attribute or changing the content of the FlowFile.

So any call to session.putAttribute or session.write returns a new FlowFile. If you update
the line that calls putAttribute so that it stores the returned FlowFile into your 'parsed' variable,
you should be good to go.

So you would do:

FlowFile parsed = session.create(original);
parsed = session.putAttribute(parsed, CoreAttributes.FILENAME.key(), context.getProperty(PARSED_FILENAME).getValue());

Otherwise, you end up trying to modify the same version twice (once when you call session.putAttribute and 
again when you call session.write). This is what the message is complaining about.

Just looking through the code, a few other comments that I would offer:

* the "static boolean error = false;" is likely to cause problems. All instances of your processor would get the same 'error' variable. 
I would recommend you use an org.apache.nifi.util.BooleanHolder object (defined in the nifi-utils module) and define 
it within your onTrigger method, rather than using a member variable.

* Experience has shown that with any log message, you should log the FlowFIle that you are referring to. You can
also parameterize your log messages. For example:

logger.error("Failed to parse {}; routing to failure", new Object[] {original});

rather than

logger.error("parsing to failure");


I hope this helps! Let us know if you're still having problems!

Thanks
-Mark

________________________________
> Date: Sat, 15 Aug 2015 19:41:00 +0000 
> From: davidrsmith@btinternet.com 
> To: dev@nifi.apache.org 
> Subject: Re: Writing to a flowfile 
> 
> Mark 
> 
> Thanks for your help. I have used the snippet of code you sent and it 
> works although I am fairly sure I haven't implemented it correctly, I 
> have had to put all of my code in the OnTrigger method, instead of in 
> the the callback. 
> I also need to change the filename attribute of the parsed flowfile, I 
> have inserted the following line: 
> 
> session.putAttribute(parsed, CoreAttributes.FILENAME.key(), 
> context.getProperty(PARSED_FILENAME).getValue()); 
> 
> But it gives me the following error: 
> 2015-08-15 21:28:55,628 ERROR [Timer-Driven Process Thread-5] 
> o.a.nifi.processors.standard.ParseMyData 
> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] 
> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] failed to process 
> due to org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0] 
> is not the most recent version of this FlowFile within this session 
> (StandardProcessSession[id=21562]); rolling back session: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0] 
> is not the most recent version of this FlowFile within this session 
> (StandardProcessSession[id=21562]) 
> 
> 
> I have attached my processor class, I would be grateful if you could 
> give it a quick look and tell me what I have done wrong. 
> 
> Many thanks 
> Dave 
> 
> 
> 
> On Saturday, 15 August 2015, 13:16, Mark Payne <ma...@hotmail.com> wrote: 
> 
> 
> David, 
> 
> In this case, since you want to keep the original intact, you will need 
> to create a 'child' flowfile to write to. 
> You do this with ProcessSession.create(FlowFile) 
> 
> So you will have code that looks something like this: 
> 
> final FlowFile original = session.get(); 
> if (original == null) { 
> return; 
> } 
> 
> // create a new 'child' FlowFile. The framework will automatically handle 
> // the provenance information so that 'parsed' is forked from 'original'. 
> FlowFile parsed = session.create(original); 
> 
> // Get an OutputStream for the 'parsed' FlowFile 
> parsed = session.write(parsed, new OutputStreamCallback() { 
> public void process(OutputStream parsedOut) { 
> 
> // Get an InputStream for the original 
> session.read(original, new InputStreamCallback() { 
> public void process(InputStream originalIn) { 
> // read from original FlowFile via originalIn 
> // write to new FlowFile via parsedOut 
> } 
> }); 
> 
> } 
> }); 
> 
> Does this give you what you need? If anything is still unclear, let us know! 
> 
> Thanks 
> -Mark 
> 
> ---------------------------------------- 
>> Date: Sat, 15 Aug 2015 10:04:54 +0100 
>> From: davidrsmith@btinternet.com<ma...@btinternet.com> 
>> Subject: Writing to a flowfile 
>> To: dev@nifi.apache.org<ma...@nifi.apache.org> 
>> 
>> 
>> Hi 
>> 
>> I'm writing a processor which parses a file, I want the parsed file 
> to go to relationship parsed, and the original file to go to 
> relationship original, if the parse was ok. 
>> If the parse fails I want the original file to go to relationship failure. 
>> 
>> I have an inner class which contains a callback which does the 
> parsing. The callback is called from the onTrigger method. 
>> My problem is that I want to read from my original flowFile and write 
> to a new flowFile, but it always seems to write to the original 
> flowfile. 
>> How do I direct my bufferedwriter to my new flowfile? 
>> 
>> Many thanks 
>> Dave 
>> 
>> Sent from Yahoo! Mail on Android 
>> 
> 
> 
> 
                         

  

RE: Writing to a flowfile

Posted by Mark Payne <ma...@hotmail.com>.
Dave,

Not a problem.

The FlowFile object itself is immutable. If you want to modify the FlowFile, you do so by asking 
the session to give you a new version of the FlowFile with some update. For instance, by adding 
an attribute or changing the content of the FlowFile.

So any call to session.putAttribute or session.write returns a new FlowFile. If you update
the line that calls putAttribute so that it stores the returned FlowFile into your 'parsed' variable,
you should be good to go.

So you would do:

FlowFile parsed = session.create(original);
parsed = session.putAttribute(parsed, CoreAttributes.FILENAME.key(), context.getProperty(PARSED_FILENAME).getValue());

Otherwise, you end up trying to modify the same version twice (once when you call session.putAttribute and 
again when you call session.write). This is what the message is complaining about.

Just looking through the code, a few other comments that I would offer:

* the "static boolean error = false;" is likely to cause problems. All instances of your processor would get the same 'error' variable. 
I would recommend you use an org.apache.nifi.util.BooleanHolder object (defined in the nifi-utils module) and define 
it within your onTrigger method, rather than using a member variable.

* Experience has shown that with any log message, you should log the FlowFIle that you are referring to. You can
also parameterize your log messages. For example:

logger.error("Failed to parse {}; routing to failure", new Object[] {original});

rather than

logger.error("parsing to failure");


I hope this helps! Let us know if you're still having problems!

Thanks
-Mark

________________________________
> Date: Sat, 15 Aug 2015 19:41:00 +0000 
> From: davidrsmith@btinternet.com 
> To: dev@nifi.apache.org 
> Subject: Re: Writing to a flowfile 
> 
> Mark 
> 
> Thanks for your help. I have used the snippet of code you sent and it 
> works although I am fairly sure I haven't implemented it correctly, I 
> have had to put all of my code in the OnTrigger method, instead of in 
> the the callback. 
> I also need to change the filename attribute of the parsed flowfile, I 
> have inserted the following line: 
> 
> session.putAttribute(parsed, CoreAttributes.FILENAME.key(), 
> context.getProperty(PARSED_FILENAME).getValue()); 
> 
> But it gives me the following error: 
> 2015-08-15 21:28:55,628 ERROR [Timer-Driven Process Thread-5] 
> o.a.nifi.processors.standard.ParseMyData 
> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] 
> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] failed to process 
> due to org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0] 
> is not the most recent version of this FlowFile within this session 
> (StandardProcessSession[id=21562]); rolling back session: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0] 
> is not the most recent version of this FlowFile within this session 
> (StandardProcessSession[id=21562]) 
> 
> 
> I have attached my processor class, I would be grateful if you could 
> give it a quick look and tell me what I have done wrong. 
> 
> Many thanks 
> Dave 
> 
> 
> 
> On Saturday, 15 August 2015, 13:16, Mark Payne <ma...@hotmail.com> wrote: 
> 
> 
> David, 
> 
> In this case, since you want to keep the original intact, you will need 
> to create a 'child' flowfile to write to. 
> You do this with ProcessSession.create(FlowFile) 
> 
> So you will have code that looks something like this: 
> 
> final FlowFile original = session.get(); 
> if (original == null) { 
> return; 
> } 
> 
> // create a new 'child' FlowFile. The framework will automatically handle 
> // the provenance information so that 'parsed' is forked from 'original'. 
> FlowFile parsed = session.create(original); 
> 
> // Get an OutputStream for the 'parsed' FlowFile 
> parsed = session.write(parsed, new OutputStreamCallback() { 
> public void process(OutputStream parsedOut) { 
> 
> // Get an InputStream for the original 
> session.read(original, new InputStreamCallback() { 
> public void process(InputStream originalIn) { 
> // read from original FlowFile via originalIn 
> // write to new FlowFile via parsedOut 
> } 
> }); 
> 
> } 
> }); 
> 
> Does this give you what you need? If anything is still unclear, let us know! 
> 
> Thanks 
> -Mark 
> 
> ---------------------------------------- 
>> Date: Sat, 15 Aug 2015 10:04:54 +0100 
>> From: davidrsmith@btinternet.com<ma...@btinternet.com> 
>> Subject: Writing to a flowfile 
>> To: dev@nifi.apache.org<ma...@nifi.apache.org> 
>> 
>> 
>> Hi 
>> 
>> I'm writing a processor which parses a file, I want the parsed file 
> to go to relationship parsed, and the original file to go to 
> relationship original, if the parse was ok. 
>> If the parse fails I want the original file to go to relationship failure. 
>> 
>> I have an inner class which contains a callback which does the 
> parsing. The callback is called from the onTrigger method. 
>> My problem is that I want to read from my original flowFile and write 
> to a new flowFile, but it always seems to write to the original 
> flowfile. 
>> How do I direct my bufferedwriter to my new flowfile? 
>> 
>> Many thanks 
>> Dave 
>> 
>> Sent from Yahoo! Mail on Android 
>> 
> 
> 
> 
 		 	   		  

Re: Writing to a flowfile

Posted by DAVID SMITH <da...@btinternet.com>.
Mark
Thanks for your help. I have used the snippet of code you sent and it works although I am fairly sure I haven't implemented it correctly, I have had to put all of my code in the OnTrigger method, instead of in the the callback.I also need to change the filename attribute of the parsed flowfile, I have inserted the following line:
session.putAttribute(parsed, CoreAttributes.FILENAME.key(), context.getProperty(PARSED_FILENAME).getValue());

But it gives me the following error:2015-08-15 21:28:55,628 ERROR [Timer-Driven Process Thread-5] o.a.nifi.processors.standard.ParseMyData ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0] is not the most recent version of this FlowFile within this session (StandardProcessSession[id=21562]); rolling back session: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0] is not the most recent version of this FlowFile within this session (StandardProcessSession[id=21562])


I have attached my processor class, I would be grateful if you could give it a quick look and tell me what I have done wrong.
Many thanksDave 


     On Saturday, 15 August 2015, 13:16, Mark Payne <ma...@hotmail.com> wrote:
   

 David,

In this case, since you want to keep the original intact, you will need to create a 'child' flowfile to write to.
You do this with ProcessSession.create(FlowFile)

So you will have code that looks something like this:

final FlowFile original = session.get();
if (original == null) {
  return;
}

// create a new 'child' FlowFile. The framework will automatically handle
// the provenance information so that 'parsed' is forked from 'original'.
FlowFile parsed = session.create(original);

// Get an OutputStream for the 'parsed' FlowFile
parsed = session.write(parsed, new OutputStreamCallback() {
    public void process(OutputStream parsedOut) {

        // Get an InputStream for the original
        session.read(original, new InputStreamCallback() {
            public void process(InputStream originalIn) {
                // read from original FlowFile via originalIn
                // write to new FlowFile via parsedOut
            }
        });

    }
});

Does this give you what you need? If anything is still unclear, let us know!

Thanks
-Mark

----------------------------------------
> Date: Sat, 15 Aug 2015 10:04:54 +0100
> From: davidrsmith@btinternet.com
> Subject: Writing to a flowfile
> To: dev@nifi.apache.org
>
>
> Hi
>
> I'm writing a processor which parses a file, I want the parsed file to go to relationship parsed, and the original file to go to relationship original, if the parse was ok.
> If the parse fails I want the original file to go to relationship failure.
>
> I have an inner class which contains a callback which does the parsing. The callback is called from the onTrigger method.
> My problem is that I want to read from my original flowFile and write to a new flowFile, but it always seems to write to the original flowfile.
> How do I direct my bufferedwriter to my new flowfile?
>
> Many thanks
> Dave
>
> Sent from Yahoo! Mail on Android
>
                         

  

RE: Writing to a flowfile

Posted by Mark Payne <ma...@hotmail.com>.
David,

In this case, since you want to keep the original intact, you will need to create a 'child' flowfile to write to.
You do this with ProcessSession.create(FlowFile)

So you will have code that looks something like this:

final FlowFile original = session.get();
if (original == null) {
  return;
}

// create a new 'child' FlowFile. The framework will automatically handle
// the provenance information so that 'parsed' is forked from 'original'.
FlowFile parsed = session.create(original);

// Get an OutputStream for the 'parsed' FlowFile
parsed = session.write(parsed, new OutputStreamCallback() {
    public void process(OutputStream parsedOut) {

        // Get an InputStream for the original
        session.read(original, new InputStreamCallback() {
            public void process(InputStream originalIn) {
                // read from original FlowFile via originalIn
                // write to new FlowFile via parsedOut
            }
        });

    }
});

Does this give you what you need? If anything is still unclear, let us know!

Thanks
-Mark

----------------------------------------
> Date: Sat, 15 Aug 2015 10:04:54 +0100
> From: davidrsmith@btinternet.com
> Subject: Writing to a flowfile
> To: dev@nifi.apache.org
>
>
> Hi
>
> I'm writing a processor which parses a file, I want the parsed file to go to relationship parsed, and the original file to go to relationship original, if the parse was ok.
> If the parse fails I want the original file to go to relationship failure.
>
> I have an inner class which contains a callback which does the parsing. The callback is called from the onTrigger method.
> My problem is that I want to read from my original flowFile and write to a new flowFile, but it always seems to write to the original flowfile.
> How do I direct my bufferedwriter to my new flowfile?
>
> Many thanks
> Dave
>
> Sent from Yahoo! Mail on Android
>