You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Mark Payne <ma...@hotmail.com> on 2020/11/18 21:08:35 UTC

Re: [EXTERNAL] horizontal merge

It is also worth noting that instead of:
def mergedFile = session.create()

You should be using:
def mergedFile = session.create(flowFileList)

This will do 2 things:
(a) it will result in the merged FlowFile inheriting the attributes from the parents
(b) it will properly establish the linkage for the data provenance

Thanks
-Mark


On Nov 18, 2020, at 3:58 PM, Joe Witt <jo...@gmail.com>> wrote:

Geoffrey

The process session is requiring you to account for all flow files you've either created or pulled from the queue.

You have logic which pulls up to 2 things.  It could pull one in which case you are returning.  You would get the above error from that in those cases.

In the case you get two items you read from 1 and 2 and create a 3rd.  You now appear to remove 1 and 2 then transfer the 3rd.  That should be fine.

Thanks

On Wed, Nov 18, 2020 at 1:50 PM Greene (US), Geoffrey N <ge...@boeing.com>> wrote:
Session.remove()!  That’s very helpful, and it makes my numbers come out correctly.  I’m Still getting “transfer relationship not specified”, though.
Here’s where I’m at now:

session.read(flowFile1, {inputStream ->
    def slurper1 = new groovy.json.JsonSlurper()
    def json1 = slurper1.parse(inputStream)
}  as InputStreamCallback)

session.read(flowFile2, {inputStream ->
    def slurper2 = new groovy.json.JsonSlurper()
    def json2 = slurper2.parse(inputStream)
}  as InputStreamCallback)

def  mergedFile = session.create()
mergedFile = session.write(mergedFile, {outputStream ->
    outputStream.write(“new information".bytes)
} as OutputStreamCallback)
session.remove (flowFile1)
session.remove(flowFile2)
session.transfer(mergedFile, REL_SUCCESS)



Geoffrey Greene
Senior Software Ninjaneer
(703) 414 2421
The Boeing Company

From: Chris Sampson [mailto:chris.sampson@naimuri.com<ma...@naimuri.com>]
Sent: Wednesday, November 18, 2020 3:33 PM
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: [EXTERNAL] Re: horizontal merge

This message was sent from outside of Boeing. Please do not click links or open attachments unless you recognize the sender and know that the content is safe.


You may want a call to `session.remove(flowFile1)` instead of transferring it.

Cheers,

Chris Sampson

On Wed, 18 Nov 2020, 20:03 Greene (US), Geoffrey N, <ge...@boeing.com>> wrote:
I've gotten closer with grabbing two files and processing them.  I still have something wrong in the paradigm though.  Here's what I've got it narrowed down to (This is in an ExecuteGroovyScript, BTW.  I hope to translate it to an InvokeScriptedProcessor later on, so I can define the transfer end points)

// get two files, always two.  Read file1, and write certain fields to file2
flowFileList = session.get(2)
if (flowFileList.size() != 2)  return

flowFile1 = flowFileList.get(0)
flowFile2 = flowFileList.get(1)

if(!flowFile1) return
if(!flowFile2) return

flowFile1 = session.read(flowFile1, {inputStream ->
               def slurper = new groovy.json.JsonSlurper()
        json1 = slurper.parse(inputStream1)
}  as OutputStreamCallback)

flowFile2 = session.write(flowFile2, {outputStream ->
        outputStream.write("foo plus some data from json1".bytes)
}  as OutputStreamCallback)

// I really don't want to TRANSFER flow file 1, I want it to go AWAY, ,but
// I have to do this
session.transfer(flowFile1, REL_SUCCESS) // << isn't needed
session.transfer(flowFile2, REL_SUCCESS)


Both files  do get read correctly, and output to success, but Nifi always throws the error that "transfer relationship not specified", which I gather, means that the call to transfer failed because one file (probably flowFile1) is not up to date....

Any thoughts?  How do you grab two files at once and then transfer them?

I really only want to transfer just the ONE out, since the data was merged in, but I can manage with two files if I have to make # inputs = # outputs

Thanks


-----Original Message-----
From: Greene (US), Geoffrey N
Sent: Tuesday, November 17, 2020 8:30 PM
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: RE: [EXTERNAL] Re: horizontal merge

The data is actually coming from a rest call,  which provides json.  That is pretty smooth at this point.

The challenge seems to be one of associating two different numbers records, and combing them back into one single file (like how do I know when I've processed all the records in BOTH files to give one single output file.  I like your suggestion of rolling back the sesion and returning, though, I will look into that, (though it might mean the file has to be processed as one single file, rather than handling them as splits/merges

I've also been playing with MergeRecords and MergeContent too, and I might be making some progress.  My struggle now is trying to figure out when I know all records are processed, since I don't have a constant number of results to watch for. I may end up writing a file appender, and just appending "as I go", so I don't have to do a count.

-----Original Message-----
From: Matt Burgess [mailto:mattyb149@apache.org<ma...@apache.org>]
Sent: Tuesday, November 17, 2020 4:22 PM
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: [EXTERNAL] Re: horizontal merge


Geoffrey,

Where are the two flowfiles coming from? This use case is often handled in NiFi using LookupRecord with one of the LookupService implementations (REST, RDBMS, CSV, etc.). We don't currently have a mechanism (besides scripting) to do enrichment/lookups from flowfiles.

For your script, you can do session.get(2) and then check the size of the returned List. If it is less than 2, you can rollback the session and return (possibly yielding first if you don't want to check again rapidly).

Regards,
Matt

On Tue, Nov 17, 2020 at 4:13 PM Greene (US), Geoffrey N <ge...@boeing.com>> wrote:
>
> I’m trying to glue two flow files together HORIZONTALLY.  That is,
>
> Flowfile1
>
> ID,STARTINGLETTER
>
> 1,A
>
> 2,B
>
>
>
> And flowfile2:
>
> ID, WORD
>
> 1,Apple
>
> 2, Ball
>
> 3, Cat
>
>
>
> I want it to become:
>
> ID, STARTINGLETTER, WORD
>
> 1,A,Apple
>
> 2,B,Ball
>
> 3,,Cat
>
>
>
> The only way I’ve been able to figure out how to do this is to write a custom “InvokeGroovyProcessor” that takes two flowfiles reads them both, and then concatenates them.
>
>
>
> I’m having trouble figuring out how to pop TWO flow files off the
> queue, (The order doesn’t really matter for now), and write one. I’ve
> tried
>
>
>
>     @Override
>
>     void onTrigger(ProcessContext context, ProcessSessionFactory
> sessionFactory) throws ProcessException {
>
>         try {
>
>             def session = sessionFactory.createSession()
>
>              while (session.getQueueSize() != 2)
>
>              {
>
>                      Thread.sleep(1000)
>
>                      log.debug("sleeping")
>
>               }
>
>              // we never get here
>
>               log.debug(“found two flow files")
>
>               // get BOTH flowfiles
>
>              def flowFileList = session.get(2)
>
>               def flowFile1 = flowFileList.get(0)
>
>               def flowFile2 = flowFileList.get(1)
>
>                                // now do the glue
>
>
>
> Or Is there a better way?
>
> Thanks!
>
>
>
>
>
>


Re: [EXTERNAL] horizontal merge

Posted by Mark Payne <ma...@hotmail.com>.
No, you do still need to handle that condition as Joe was mentioning.
The easiest route would be:

If (flowFileList.size() != 2) {
  session.rollback();
  return;
}

On Nov 18, 2020, at 4:16 PM, Greene (US), Geoffrey N <ge...@boeing.com>> wrote:

The create(flowFileList) did the trick.  No more warnings now!

Joe, you thought that I had a bug where if I don’t pull two I should NOT just be returning.  Is there something I should be doing instead?  Isn’t
flowFileList = session.get(2)
if (flowFileList.size() != 2)
{
       return
}
Enough?  I assumed that since this is an ExecuteGroovyScript, it does a rollback when you do a return…

Thanks!

From: Mark Payne [mailto:markap14@hotmail.com]
Sent: Wednesday, November 18, 2020 4:09 PM
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: [EXTERNAL] horizontal merge

This message was sent from outside of Boeing. Please do not click links or open attachments unless you recognize the sender and know that the content is safe.



It is also worth noting that instead of:
def mergedFile = session.create()

You should be using:
def mergedFile = session.create(flowFileList)

This will do 2 things:
(a) it will result in the merged FlowFile inheriting the attributes from the parents
(b) it will properly establish the linkage for the data provenance

Thanks
-Mark



On Nov 18, 2020, at 3:58 PM, Joe Witt <jo...@gmail.com>> wrote:

Geoffrey

The process session is requiring you to account for all flow files you've either created or pulled from the queue.

You have logic which pulls up to 2 things.  It could pull one in which case you are returning.  You would get the above error from that in those cases.

In the case you get two items you read from 1 and 2 and create a 3rd.  You now appear to remove 1 and 2 then transfer the 3rd.  That should be fine.

Thanks

On Wed, Nov 18, 2020 at 1:50 PM Greene (US), Geoffrey N <ge...@boeing.com>> wrote:
Session.remove()!  That’s very helpful, and it makes my numbers come out correctly.  I’m Still getting “transfer relationship not specified”, though.
Here’s where I’m at now:

session.read(flowFile1, {inputStream ->
    def slurper1 = new groovy.json.JsonSlurper()
    def json1 = slurper1.parse(inputStream)
}  as InputStreamCallback)

session.read(flowFile2, {inputStream ->
    def slurper2 = new groovy.json.JsonSlurper()
    def json2 = slurper2.parse(inputStream)
}  as InputStreamCallback)

def  mergedFile = session.create()
mergedFile = session.write(mergedFile, {outputStream ->
    outputStream.write(“new information".bytes)
} as OutputStreamCallback)
session.remove (flowFile1)
session.remove(flowFile2)
session.transfer(mergedFile, REL_SUCCESS)



Geoffrey Greene
Senior Software Ninjaneer
(703) 414 2421
The Boeing Company

From: Chris Sampson [mailto:chris.sampson@naimuri.com<ma...@naimuri.com>]
Sent: Wednesday, November 18, 2020 3:33 PM
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: [EXTERNAL] Re: horizontal merge

This message was sent from outside of Boeing. Please do not click links or open attachments unless you recognize the sender and know that the content is safe.



You may want a call to `session.remove(flowFile1)` instead of transferring it.

Cheers,

Chris Sampson

On Wed, 18 Nov 2020, 20:03 Greene (US), Geoffrey N, <ge...@boeing.com>> wrote:
I've gotten closer with grabbing two files and processing them.  I still have something wrong in the paradigm though.  Here's what I've got it narrowed down to (This is in an ExecuteGroovyScript, BTW.  I hope to translate it to an InvokeScriptedProcessor later on, so I can define the transfer end points)

// get two files, always two.  Read file1, and write certain fields to file2
flowFileList = session.get(2)
if (flowFileList.size() != 2)  return

flowFile1 = flowFileList.get(0)
flowFile2 = flowFileList.get(1)

if(!flowFile1) return
if(!flowFile2) return

flowFile1 = session.read(flowFile1, {inputStream ->
               def slurper = new groovy.json.JsonSlurper()
        json1 = slurper.parse(inputStream1)
}  as OutputStreamCallback)

flowFile2 = session.write(flowFile2, {outputStream ->
        outputStream.write("foo plus some data from json1".bytes)
}  as OutputStreamCallback)

// I really don't want to TRANSFER flow file 1, I want it to go AWAY, ,but
// I have to do this
session.transfer(flowFile1, REL_SUCCESS) // << isn't needed
session.transfer(flowFile2, REL_SUCCESS)


Both files  do get read correctly, and output to success, but Nifi always throws the error that "transfer relationship not specified", which I gather, means that the call to transfer failed because one file (probably flowFile1) is not up to date....

Any thoughts?  How do you grab two files at once and then transfer them?

I really only want to transfer just the ONE out, since the data was merged in, but I can manage with two files if I have to make # inputs = # outputs

Thanks


-----Original Message-----
From: Greene (US), Geoffrey N
Sent: Tuesday, November 17, 2020 8:30 PM
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: RE: [EXTERNAL] Re: horizontal merge

The data is actually coming from a rest call,  which provides json.  That is pretty smooth at this point.

The challenge seems to be one of associating two different numbers records, and combing them back into one single file (like how do I know when I've processed all the records in BOTH files to give one single output file.  I like your suggestion of rolling back the sesion and returning, though, I will look into that, (though it might mean the file has to be processed as one single file, rather than handling them as splits/merges

I've also been playing with MergeRecords and MergeContent too, and I might be making some progress.  My struggle now is trying to figure out when I know all records are processed, since I don't have a constant number of results to watch for. I may end up writing a file appender, and just appending "as I go", so I don't have to do a count.

-----Original Message-----
From: Matt Burgess [mailto:mattyb149@apache.org<ma...@apache.org>]
Sent: Tuesday, November 17, 2020 4:22 PM
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: [EXTERNAL] Re: horizontal merge


Geoffrey,

Where are the two flowfiles coming from? This use case is often handled in NiFi using LookupRecord with one of the LookupService implementations (REST, RDBMS, CSV, etc.). We don't currently have a mechanism (besides scripting) to do enrichment/lookups from flowfiles.

For your script, you can do session.get(2) and then check the size of the returned List. If it is less than 2, you can rollback the session and return (possibly yielding first if you don't want to check again rapidly).

Regards,
Matt

On Tue, Nov 17, 2020 at 4:13 PM Greene (US), Geoffrey N <ge...@boeing.com>> wrote:
>
> I’m trying to glue two flow files together HORIZONTALLY.  That is,
>
> Flowfile1
>
> ID,STARTINGLETTER
>
> 1,A
>
> 2,B
>
>
>
> And flowfile2:
>
> ID, WORD
>
> 1,Apple
>
> 2, Ball
>
> 3, Cat
>
>
>
> I want it to become:
>
> ID, STARTINGLETTER, WORD
>
> 1,A,Apple
>
> 2,B,Ball
>
> 3,,Cat
>
>
>
> The only way I’ve been able to figure out how to do this is to write a custom “InvokeGroovyProcessor” that takes two flowfiles reads them both, and then concatenates them.
>
>
>
> I’m having trouble figuring out how to pop TWO flow files off the
> queue, (The order doesn’t really matter for now), and write one. I’ve
> tried
>
>
>
>     @Override
>
>     void onTrigger(ProcessContext context, ProcessSessionFactory
> sessionFactory) throws ProcessException {
>
>         try {
>
>             def session = sessionFactory.createSession()
>
>              while (session.getQueueSize() != 2)
>
>              {
>
>                      Thread.sleep(1000)
>
>                      log.debug("sleeping")
>
>               }
>
>              // we never get here
>
>               log.debug(“found two flow files")
>
>               // get BOTH flowfiles
>
>              def flowFileList = session.get(2)
>
>               def flowFile1 = flowFileList.get(0)
>
>               def flowFile2 = flowFileList.get(1)
>
>                                // now do the glue
>
>
>
> Or Is there a better way?
>
> Thanks!
>
>
>
>
>
>


RE: [EXTERNAL] horizontal merge

Posted by "Greene (US), Geoffrey N" <ge...@boeing.com>.
The create(flowFileList) did the trick.  No more warnings now!

Joe, you thought that I had a bug where if I don’t pull two I should NOT just be returning.  Is there something I should be doing instead?  Isn’t
flowFileList = session.get(2)
if (flowFileList.size() != 2)
{
       return
}
Enough?  I assumed that since this is an ExecuteGroovyScript, it does a rollback when you do a return…

Thanks!

From: Mark Payne [mailto:markap14@hotmail.com]
Sent: Wednesday, November 18, 2020 4:09 PM
To: users@nifi.apache.org
Subject: Re: [EXTERNAL] horizontal merge


This message was sent from outside of Boeing. Please do not click links or open attachments unless you recognize the sender and know that the content is safe.




It is also worth noting that instead of:
def mergedFile = session.create()

You should be using:
def mergedFile = session.create(flowFileList)

This will do 2 things:
(a) it will result in the merged FlowFile inheriting the attributes from the parents
(b) it will properly establish the linkage for the data provenance

Thanks
-Mark



On Nov 18, 2020, at 3:58 PM, Joe Witt <jo...@gmail.com>> wrote:

Geoffrey

The process session is requiring you to account for all flow files you've either created or pulled from the queue.

You have logic which pulls up to 2 things.  It could pull one in which case you are returning.  You would get the above error from that in those cases.

In the case you get two items you read from 1 and 2 and create a 3rd.  You now appear to remove 1 and 2 then transfer the 3rd.  That should be fine.

Thanks

On Wed, Nov 18, 2020 at 1:50 PM Greene (US), Geoffrey N <ge...@boeing.com>> wrote:
Session.remove()!  That’s very helpful, and it makes my numbers come out correctly.  I’m Still getting “transfer relationship not specified”, though.
Here’s where I’m at now:

session.read(flowFile1, {inputStream ->
    def slurper1 = new groovy.json.JsonSlurper()
    def json1 = slurper1.parse(inputStream)
}  as InputStreamCallback)

session.read(flowFile2, {inputStream ->
    def slurper2 = new groovy.json.JsonSlurper()
    def json2 = slurper2.parse(inputStream)
}  as InputStreamCallback)

def  mergedFile = session.create()
mergedFile = session.write(mergedFile, {outputStream ->
    outputStream.write(“new information".bytes)
} as OutputStreamCallback)
session.remove (flowFile1)
session.remove(flowFile2)
session.transfer(mergedFile, REL_SUCCESS)



Geoffrey Greene
Senior Software Ninjaneer
(703) 414 2421
The Boeing Company

From: Chris Sampson [mailto:chris.sampson@naimuri.com<ma...@naimuri.com>]
Sent: Wednesday, November 18, 2020 3:33 PM
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: Re: [EXTERNAL] Re: horizontal merge


This message was sent from outside of Boeing. Please do not click links or open attachments unless you recognize the sender and know that the content is safe.




You may want a call to `session.remove(flowFile1)` instead of transferring it.

Cheers,

Chris Sampson

On Wed, 18 Nov 2020, 20:03 Greene (US), Geoffrey N, <ge...@boeing.com>> wrote:
I've gotten closer with grabbing two files and processing them.  I still have something wrong in the paradigm though.  Here's what I've got it narrowed down to (This is in an ExecuteGroovyScript, BTW.  I hope to translate it to an InvokeScriptedProcessor later on, so I can define the transfer end points)

// get two files, always two.  Read file1, and write certain fields to file2
flowFileList = session.get(2)
if (flowFileList.size() != 2)  return

flowFile1 = flowFileList.get(0)
flowFile2 = flowFileList.get(1)

if(!flowFile1) return
if(!flowFile2) return

flowFile1 = session.read(flowFile1, {inputStream ->
               def slurper = new groovy.json.JsonSlurper()
        json1 = slurper.parse(inputStream1)
}  as OutputStreamCallback)

flowFile2 = session.write(flowFile2, {outputStream ->
        outputStream.write("foo plus some data from json1".bytes)
}  as OutputStreamCallback)

// I really don't want to TRANSFER flow file 1, I want it to go AWAY, ,but
// I have to do this
session.transfer(flowFile1, REL_SUCCESS) // << isn't needed
session.transfer(flowFile2, REL_SUCCESS)


Both files  do get read correctly, and output to success, but Nifi always throws the error that "transfer relationship not specified", which I gather, means that the call to transfer failed because one file (probably flowFile1) is not up to date....

Any thoughts?  How do you grab two files at once and then transfer them?

I really only want to transfer just the ONE out, since the data was merged in, but I can manage with two files if I have to make # inputs = # outputs

Thanks


-----Original Message-----
From: Greene (US), Geoffrey N
Sent: Tuesday, November 17, 2020 8:30 PM
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: RE: [EXTERNAL] Re: horizontal merge

The data is actually coming from a rest call,  which provides json.  That is pretty smooth at this point.

The challenge seems to be one of associating two different numbers records, and combing them back into one single file (like how do I know when I've processed all the records in BOTH files to give one single output file.  I like your suggestion of rolling back the sesion and returning, though, I will look into that, (though it might mean the file has to be processed as one single file, rather than handling them as splits/merges

I've also been playing with MergeRecords and MergeContent too, and I might be making some progress.  My struggle now is trying to figure out when I know all records are processed, since I don't have a constant number of results to watch for. I may end up writing a file appender, and just appending "as I go", so I don't have to do a count.

-----Original Message-----
From: Matt Burgess [mailto:mattyb149@apache.org<ma...@apache.org>]
Sent: Tuesday, November 17, 2020 4:22 PM
To: users@nifi.apache.org<ma...@nifi.apache.org>
Subject: [EXTERNAL] Re: horizontal merge


Geoffrey,

Where are the two flowfiles coming from? This use case is often handled in NiFi using LookupRecord with one of the LookupService implementations (REST, RDBMS, CSV, etc.). We don't currently have a mechanism (besides scripting) to do enrichment/lookups from flowfiles.

For your script, you can do session.get(2) and then check the size of the returned List. If it is less than 2, you can rollback the session and return (possibly yielding first if you don't want to check again rapidly).

Regards,
Matt

On Tue, Nov 17, 2020 at 4:13 PM Greene (US), Geoffrey N <ge...@boeing.com>> wrote:
>
> I’m trying to glue two flow files together HORIZONTALLY.  That is,
>
> Flowfile1
>
> ID,STARTINGLETTER
>
> 1,A
>
> 2,B
>
>
>
> And flowfile2:
>
> ID, WORD
>
> 1,Apple
>
> 2, Ball
>
> 3, Cat
>
>
>
> I want it to become:
>
> ID, STARTINGLETTER, WORD
>
> 1,A,Apple
>
> 2,B,Ball
>
> 3,,Cat
>
>
>
> The only way I’ve been able to figure out how to do this is to write a custom “InvokeGroovyProcessor” that takes two flowfiles reads them both, and then concatenates them.
>
>
>
> I’m having trouble figuring out how to pop TWO flow files off the
> queue, (The order doesn’t really matter for now), and write one. I’ve
> tried
>
>
>
>     @Override
>
>     void onTrigger(ProcessContext context, ProcessSessionFactory
> sessionFactory) throws ProcessException {
>
>         try {
>
>             def session = sessionFactory.createSession()
>
>              while (session.getQueueSize() != 2)
>
>              {
>
>                      Thread.sleep(1000)
>
>                      log.debug("sleeping")
>
>               }
>
>              // we never get here
>
>               log.debug(“found two flow files")
>
>               // get BOTH flowfiles
>
>              def flowFileList = session.get(2)
>
>               def flowFile1 = flowFileList.get(0)
>
>               def flowFile2 = flowFileList.get(1)
>
>                                // now do the glue
>
>
>
> Or Is there a better way?
>
> Thanks!
>
>
>
>
>
>