You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Rick Braddy <rb...@softnas.com> on 2015/09/12 01:00:15 UTC

Transfer relationship not specified (FlowFileHandlingException)

Hi,

I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:

2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] c.s.c.processors.files.GetFileData [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to retrieve files due to {}  org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@93964c66,offset=244,name=225120878343804,size=42] transfer relationship not specified

I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.

Any suggestions for troubleshooting?

Rick



RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Joe Witt <jo...@gmail.com>.
We should fix the wording.  It should just state the flowfile was never
transfered and must be accounted for.  "Session.transfer never called for
flowfile X. This is a bug in the processor and must be corrected."  as
written it is ambiguous.
On Sep 11, 2015 7:39 PM, "Mark Payne" <ma...@hotmail.com> wrote:

> Rick,
> This error message isn't indicating that there's no Connection for the
> Relationship, but rather than the FlowFile was never transferred.
> I.e., there was never a call to session.transfer() for that FlowFile.
> Thanks-Mark
>
> > From: rbraddy@softnas.com
> > To: dev@nifi.apache.org
> > Subject: RE: Transfer relationship not specified
> (FlowFileHandlingException)
> > Date: Fri, 11 Sep 2015 23:25:33 +0000
> >
> > Some more details:
> >
> > 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3]
> c.s.c.processors.files.GetFileData
> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to process
> session due to
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1ea005,offset=0,name=printargs.c,size=190]
> is not known in this session (StandardProcessSession[id=6967]):
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1ea005,offset=0,name=printargs.c,size=190]
> is not known in this session (StandardProcessSession[id=6967])
> > 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3]
> c.s.c.processors.files.GetFileData
> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added
> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@ff0cad6b,offset=0,name=anImage.png,size=16418]
> to flow
> > 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3]
> c.s.c.processors.files.GetFileData
> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added
> StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0]
> to flow
> > ...
> >
> > So it seems there's some issue with each of the FlowFiles...
> >
> > -----Original Message-----
> > From: Rick Braddy [mailto:rbraddy@softnas.com]
> > Sent: Friday, September 11, 2015 6:00 PM
> > To: dev@nifi.apache.org
> > Subject: Transfer relationship not specified (FlowFileHandlingException)
> >
> > Hi,
> >
> > I have a processor that appears to be creating FlowFiles correctly
> (modified a standard processor), but when it goes to commit() the session,
> an exception is raised:
> >
> > 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6]
> c.s.c.processors.files.GetFileData
> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to retrieve
> files due to {}
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@93964c66,offset=244,name=225120878343804,size=42]
> transfer relationship not specified
> >
> > I'm assuming this is supposed to be indicating there's no connection
> available to commit the transfer; however, there is a "success"
> relationship registered during init() in same way as original processor did
> it, and the success relationship out is connected to another processor
> input as it should be.
> >
> > Any suggestions for troubleshooting?
> >
> > Rick
> >
> >
>

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Rick Braddy <rb...@softnas.com>.
Mark,

The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.

Rick

       final ListIterator<File> itr = files.listIterator();
        FlowFile flowFile = null;
        try {
            while (itr.hasNext()) {
                final File file = itr.next();
                final Path filePath = file.toPath();
                final Path relativePath = filePath.relativize(filePath.getParent());
                String relativePathString = relativePath.toString() + "/";
                if (relativePathString.isEmpty()) {
                    relativePathString = "./";
                }
                final Path absPath = filePath.toAbsolutePath();
                final String absPathString = absPath.getParent().toString() + "/";

                final long importStart = System.nanoTime();
                String fileType = "directory";
                flowFile = session.create();
                if (file.isFile()){
                    fileType = "file";
                    flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
                }
                final long importNanos = System.nanoTime() - importStart;
                final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);

                flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
                flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
                flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
                flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
                Map<String, String> attributes = getAttributesFromFile(filePath);
                if (attributes.size() > 0) {
                    flowFile = session.putAllAttributes(flowFile, attributes);
                }

                session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
                session.transfer(flowFile, REL_SUCCESS);
                logger.info("added {} to flow", new Object[]{flowFile});

                if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
                    queueLock.lock();
                    try {
                        while (itr.hasNext()) {
                            final File nextFile = itr.next();
                            fileQueue.add(nextFile);
                            inProcess.remove(nextFile);
                        }
                    } finally {
                        queueLock.unlock();
                    }
                }
            }
            session.commit();
        } catch (final Exception e) {
            logger.error("Failed to transfer files due to {}", e);

-----Original Message-----
From: Mark Payne [mailto:markap14@hotmail.com] 
Sent: Friday, September 11, 2015 6:39 PM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

Rick,
This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
I.e., there was never a call to session.transfer() for that FlowFile.
Thanks-Mark

> From: rbraddy@softnas.com
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified 
> (FlowFileHandlingException)
> Date: Fri, 11 Sep 2015 23:25:33 +0000
> 
> Some more details:
> 
> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
> c.s.c.processors.files.GetFileData 
> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to process 
> session due to 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim
> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1e
> a005,offset=0,name=printargs.c,size=190] is not known in this session 
> (StandardProcessSession[id=6967]): 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim
> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1e
> a005,offset=0,name=printargs.c,size=190] is not known in this session 
> (StandardProcessSession[id=6967])
> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> c.s.c.processors.files.GetFileData 
> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,claim
> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff0c
> ad6b,offset=0,name=anImage.png,size=16418] to flow
> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
> 
> So it seems there's some issue with each of the FlowFiles...
> 
> -----Original Message-----
> From: Rick Braddy [mailto:rbraddy@softnas.com]
> Sent: Friday, September 11, 2015 6:00 PM
> To: dev@nifi.apache.org
> Subject: Transfer relationship not specified 
> (FlowFileHandlingException)
> 
> Hi,
> 
> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
> 
> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
> c.s.c.processors.files.GetFileData 
> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
> retrieve files due to {}  
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,claim
> =org.apache.nifi.controller.repository.claim.StandardContentClaim@9396
> 4c66,offset=244,name=225120878343804,size=42] transfer relationship 
> not specified
> 
> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
> 
> Any suggestions for troubleshooting?
> 
> Rick
> 
> 
 		 	   		  

Re: is not the most recent version of this FlowFile within this session

Posted by Mark Payne <ma...@hotmail.com>.
plj,

It looks like you're running in the same issue that Rick hit a week or so ago. You need to be sure to transfer the 'original' FlowFile somewhere
or otherwise remove it from the flow. The discussion is below, for more clarity.

Thanks
-Mark


> Begin forwarded message:
> 
> From: Rick Braddy <rb...@softnas.com>
> Subject: RE: Transfer relationship not specified (FlowFileHandlingException)
> Date: September 12, 2015 at 5:04:31 PM EDT
> To: "dev@nifi.apache.org" <de...@nifi.apache.org>
> Reply-To: dev@nifi.apache.org
> 
> All,
> 
> Thank you very much for your help on this.  My processor is now working perfectly.  The key was calling session.remove() after reading each inbound flowfile.
> 
> I certainly learned a lot from this experience and appreciate all the guidance and training.  Hopefully this thread will help others in the future.
> 
> I agree with Joe's earlier comment - the error message wasn't especially helpful.  If the error message had printed out the offending record it may have been more obvious that the issue was related to inbound flowfile/relationship instead of outbound.  The solution had nothing to do with "transfer", so the current message was even further misleading.
> 
> What might help in the future would be an inventory of Nifi exceptions, along with an explanation of likely causes and recommendations to resolve.  I realize this is a lot of work to document, but as the project matures, this kind of thing, along with an API reference guide would be very helpful for newbie developers.
> 
> Thanks again!
> 
> On my way again...
> 
> Rick
> 
> -----Original Message-----
> From: Rick Braddy [mailto:rbraddy@softnas.com] 
> Sent: Saturday, September 12, 2015 3:31 PM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified (FlowFileHandlingException)
> 
> Mark,
> 
> Okay. That helps a lot!!
> 
> The thing is, like the original GetFile, the new processor can read and then delete the original file (it's still an option in case the use-case is to "migrate" the files instead of "replicate" them).
> 
> So given that twist, it sounds like session.remove() may make the most sense, similar to the original GetFile processor.
> 
> Thank you very much for this succinct explanation.  Not sure why I couldn't figure those things out from reading the Developer Guide - perhaps some examples or similar dialog to the below would help others.
> 
> Thanks!
> Rick
> 
> -----Original Message-----
> From: Mark Payne [mailto:markap14@hotmail.com]
> Sent: Saturday, September 12, 2015 3:26 PM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified (FlowFileHandlingException)
> 
> Rick,
> Some great questions!
> 1. If session.remove() is called, and then the session is rolled back, the original FlowFile will be requeued so that it can be re-processed.
> 2. If you call session.importFrom, it will overwrite the contents of the FlowFile. However, this is done in such a way that if you overwrite itand then call session.rollback, again the original FlowFile will be re-queued, not the modified version.
> 3. re: @SupportsBatching. There's not necessarily a lot of overhead in calling onTrigger() repeated, but rather the overhead lives within session.commit().When you use the @SupportsBatching annotation, and then configure the Processor to batch, for say 25 ms (in the Scheduling Tab of the Processor Configuration dialog),what you are doing is telling the framework that calls to session.commit() can be done asynchronously, essentially, so that we can wait up to 25 milliseconds and then performa single update to the FlowFile and Provenance Repositories that includes all of the information for those sessions. So it is the session commit that is batched up.
> GetFile doesn't use this approach because if it did this, it could pull in a FlowFile, call session.commit (which may not actually update the repositories if the updates are batched),and then deletes the original file. At this point, if the node died, it would have lost that session commit (which means that the file will not be in the flow on restart) AND it willhave already deleted the file from disk.
> So in order to avoid that, it does not support batching. So when we call session.commit(), we know that the repositories are updated and the FlowFile is safe within NiFi. So nowthat NiFi is responsible for handling the file, we can delete the original.
> In your case, you can support batching because you're not deleting the origin. So if you haven't yet committed the session and the node dies, it's okay, you can just re-processthe file.
> Does that make sense?  If not, please let me know because this isn't an easy concept necessarily to communicate, but it needs to be communicated well.
> Thanks-Mark
> 
>> From: rbraddy@softnas.com
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>> Date: Sat, 12 Sep 2015 20:12:35 +0000
>> 
>> Mark,
>> 
>> That's exactly the advice and guidance that I needed.  A couple of questions for #2 and #3 cases.
>> 
>> 2. Call session.remove() instead of session.transfer(). This will remove the incoming FlowFile from the flow. Given thedescription that you have provided, that means you will always create a single child FlowFile and then remove the parent. That would work, but it seems a bit confusing.
>> 
>>>> when session.remove() is called and a session.rollback() occurs due to an error, does the remove operation also get rolled back so the original inbound flowfiles are available to be reprocessed?
>> 
>> 3. The option that I think makes the most sense for this use case. Do not create a new FlowFile at all. Instead, get a FlowFilevia session.get() and then modify the contents of that FlowFile by calling session.import with the original FlowFile.
>> 
>>>> I agree this makes a lot of sense and would simplify things 
>>>> greatly.  In this case, would session.importFrom overwrite the 
>>>> contents of the original inbound flowfile or append to it?  (need a 
>>>> way to replace the file path with the file's contents)
>> 
>>>> Is there a significant amount of context-switch overhead in onTrigger() being called repeatedly vs. batching/queueing?  Is this effectively what @SupportBatching does, to make multiple immediate calls back to onTrigger() before yielding to other threads? 
>> 
>>>> Why does the GetFile processor use the internal batching/queuing approach?
>> 
>> Thank you.
>> Rick
>> 
>> -----Original Message-----
>> From: Mark Payne [mailto:markap14@hotmail.com]
>> Sent: Saturday, September 12, 2015 2:59 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>> 
>> Rick,
>> You must account for all FlowFiles that you pull in from queues. You can account for a FlowFile by eitherremoving it or transferring it. In terms of what to do with the incoming FlowFiles, you have three options, really:
>> 1. Transfer to an 'original' relationship. If you think this will be an "unused" relationship, then this is probablynot a good option for you. I would never recommend creating a relationship that you don't believe will be useful.
>> 2. Call session.remove() instead of session.transfer(). This will remove the incoming FlowFile from the flow. Given thedescription that you have provided, that means you will always create a single child FlowFile and then remove the parent.That would work, but it seems a bit confusing.
>> 3. The option that I think makes the most sense for this use case. Do not create a new FlowFile at all. Instead, get a FlowFilevia session.get() and then modify the contents of that FlowFile by calling session.import with the original FlowFile. This willthen modify the content of the FlowFile, rather than creating a child. The FlowFile would then just be routed to success.
>> Regarding batching: generally for this type of Processor, I would recommend you do not pull in 10 FlowFiles at a time and keepa collection of them. Rather, I would pull in a single FlowFile and process it. I would then use the @SupportsBatching annotation.This allows the operator to make the trade-off of throughput vs. latency as appropriate for the particular flow. This approach alsoallows the Processor's code to be a little simpler, as it doesn't need to maintain Collections of FlowFiles - it just operates on a singleFlowFile at a time.
>> Hope this helps! Let us know if you have any further questions or issues.
>> Thanks-Mark
>> 
>>> From: rbraddy@softnas.com
>>> To: dev@nifi.apache.org
>>> Subject: RE: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>> Date: Sat, 12 Sep 2015 18:16:48 +0000
>>> 
>>> I went back and re-read the Developer's Guide to try and understand how incoming FlowFiles should be handled relative to session transactions.
>>> 
>>> Currently, my processor uses session.get() to pull a batch of up to 10 incoming flowfiles and places them into an internal queue.  It never calls session.transfer() for these inbound flowfiles.  It just reads the contents from each one using session.read() and processes that input.  Each of these flowfiles contains a directory or filename of a file.
>>> 
>>> Then it creates up to 10 new outbound flowfiles as a batch, using session.importFrom() and session.transfer() for each one, then finally calls session.commit().  It is at this point that the framework raises an exception FlowFileHandlingException (transfer relationship not specified) and fails.  Based upon debugging, it appears the exception is raised against the first flowfile record that was recorded - the first incoming flowfile, for which session.transfer() was never called.
>>> 
>>> Questions:
>>> 
>>> 1. Is it required to "dispose" of incoming flowfiles that have been accessed via session.get() using session.transfer()?
>>> 
>>> 2. If so, should the incoming flowfiles be routed to an unused relationship, such as "ORIGINAL" like the SplitText processor does?
>>> 
>>> 3. I read about the "SupportsBatching" attribute, which is not set on my processor (or the original GetFile processor, which does not read from input queues).  Given I am reading and writing flowfiles in batches, should this attribute be set?
>>> 
>>> Thanks
>>> Rick
>>> 
>>> -----Original Message-----
>>> From: Rick Braddy [mailto:rbraddy@softnas.com]
>>> Sent: Saturday, September 12, 2015 8:50 AM
>>> To: dev@nifi.apache.org
>>> Subject: RE: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>> 
>>> More information...Upon further debugging with in the StandardProcessSession : checkpoint() method, it's clear that there are 16 items in "record" list - 10 are incoming flowfiles that were read and processes, and 6 are the newly-created outbound flows.
>>> 
>>> The for loop is throwing the exception on the very first record, which is actually one of the 10 inbound flowfiles that has already been processed:
>>> 
>>>       //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
>>>        final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
>>>        for (final StandardRepositoryRecord record : records.values()) {
>>>            if (record.isMarkedForDelete()) {
>>>                continue;
>>>            }
>>>            final Relationship relationship = record.getTransferRelationship();
>>>         if (relationship == null) {
>>>                rollback();
>>> -->          throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship not specified");
>>>            }
>>> 
>>> I modified the standard GetFile processor to accept incoming flowfiles that contain a file path per flowfile, so GetFileData (the new processor) can be triggered to process specific files.  I did NOT define a specific incoming relationship and just assumed there is one already available by default.  If there is not, that may be the problem.  There is clearly an inbound relationship established, as the inbound flowfiles are being read and processed just fine, but it seems that commit() calling checkpoint() doesn't like what it's seeing overall.
>>> 
>>> Rick
>>> 
>>> -----Original Message-----
>>> From: Rick Braddy [mailto:rbraddy@softnas.com]
>>> Sent: Saturday, September 12, 2015 7:42 AM
>>> To: dev@nifi.apache.org
>>> Subject: RE: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>> 
>>> Ok.  Thanks Joe.  The files I'm using are simple .c text and .png image files for testing.
>>> 
>>> Rick
>>> 
>>> -----Original Message-----
>>> From: Joe Witt [mailto:joe.witt@gmail.com]
>>> Sent: Saturday, September 12, 2015 7:41 AM
>>> To: dev@nifi.apache.org
>>> Subject: Re: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>> 
>>> Yep.  Looks legit to me.  Will try a unit test with a mixture of flowFiles associated with content and without.
>>> 
>>> On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rb...@softnas.com> wrote:
>>>> Joe,
>>>> 
>>>> Replies below.
>>>> 
>>>> Rick
>>>> 
>>>> 
>>>> -----Original Message-----
>>>> From: Joe Witt [mailto:joe.witt@gmail.com]
>>>> Sent: Saturday, September 12, 2015 7:02 AM
>>>> To: dev@nifi.apache.org
>>>> Subject: Re: Transfer relationship not specified
>>>> (FlowFileHandlingException)
>>>> 
>>>> Rick
>>>> 
>>>> Can you show what is happening in the exception handling part of your code as well?
>>>> 
>>>> Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help).
>>>> 
>>>> Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer().
>>>> 
>>>> Here's the entire onTrigger method:
>>>> 
>>>>    @Override
>>>>    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
>>>>        final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
>>>>        final ProcessorLog logger = getLogger();
>>>> 
>>>>        final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
>>>>        if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
>>>>            try {
>>>>                final Set<File> filelist = getFileList(context, 
>>>> session);
>>>> 
>>>>                queueLock.lock();
>>>>                try {
>>>>                    filelist.removeAll(inProcess);
>>>>                    if (!keepingSourceFile) {
>>>>                        filelist.removeAll(recentlyProcessed);
>>>>                    }
>>>> 
>>>>                    fileQueue.clear();
>>>>                    fileQueue.addAll(filelist);
>>>> 
>>>>                    queueLastUpdated.set(System.currentTimeMillis());
>>>>                    recentlyProcessed.clear();
>>>> 
>>>>                    if (filelist.isEmpty()) {
>>>>                        context.yield();
>>>>                    }
>>>>                } finally {
>>>>                    queueLock.unlock();
>>>>                }
>>>>            } finally {
>>>>                filelistLock.unlock();
>>>>            }
>>>>        }
>>>> 
>>>>        final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
>>>>        final List<File> files = new ArrayList<>(batchSize);
>>>>        queueLock.lock();
>>>>        try {
>>>>            fileQueue.drainTo(files, batchSize);
>>>>            if (files.isEmpty()) {
>>>>                return;
>>>>            } else {
>>>>                inProcess.addAll(files);
>>>>            }
>>>>        } finally {
>>>>            queueLock.unlock();
>>>>        }
>>>> 
>>>>        final ListIterator<File> itr = files.listIterator();
>>>>        FlowFile flowFile = null;
>>>>        try {
>>>>            while (itr.hasNext()) {
>>>>                final File file = itr.next();
>>>>                final Path filePath = file.toPath();
>>>>                final Path relativePath = filePath.relativize(filePath.getParent());
>>>>                String relativePathString = relativePath.toString() + "/";
>>>>                if (relativePathString.isEmpty()) {
>>>>                    relativePathString = "./";
>>>>                }
>>>>                final Path absPath = filePath.toAbsolutePath();
>>>>                final String absPathString =
>>>> absPath.getParent().toString() + "/";
>>>> 
>>>>                final long importStart = System.nanoTime();
>>>>                String fileType = "directory";
>>>>                if (file.isFile()){
>>>>                    fileType = "file";
>>>>                    flowFile = session.create();
>>>>                    flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>>>>                }
>>>>                else
>>>>                {
>>>>                   logger.info("skipping directory {} and not placing into output flow", new Object[]{file});
>>>>                   continue; // ******* SKIP DIRECTORIES FOR NOW ****
>>>>                }
>>>> 
>>>>                final long importNanos = System.nanoTime() - importStart;
>>>>                final long importMillis = 
>>>> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>>>> 
>>>>              //  flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>>>>              //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>>>>              //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>>>>              //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>>>>              //  Map<String, String> attributes = getAttributesFromFile(filePath);
>>>>              //  if (attributes.size() > 0) {
>>>>              //      flowFile = session.putAllAttributes(flowFile, attributes);
>>>>              //  }
>>>> 
>>>>                final String fileURI = file.toURI().toString();
>>>>                session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
>>>>                session.transfer(flowFile, REL_SUCCESS);
>>>>                logger.info("added {} to flow", new 
>>>> Object[]{flowFile});
>>>> 
>>>>                if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>>>>                    queueLock.lock();
>>>>                    try {
>>>>                        while (itr.hasNext()) {
>>>>                            final File nextFile = itr.next();
>>>>                            fileQueue.add(nextFile);
>>>>                            inProcess.remove(nextFile);
>>>>                        }
>>>>                    } finally {
>>>>                        queueLock.unlock();
>>>>                    }
>>>>                }
>>>>            }
>>>>            session.commit();
>>>>        } catch (final Exception e) {
>>>>            logger.error("Failed to transfer files due to {}", e);
>>>>            context.yield();
>>>> 
>>>> 
>>>>            // anything that we've not already processed needs to be put back on the queue
>>>>            if (flowFile != null) {
>>>>                session.remove(flowFile);
>>>>            }
>>>>        } finally {
>>>>            queueLock.lock();
>>>>            try {
>>>>                inProcess.removeAll(files);
>>>>                recentlyProcessed.addAll(files);
>>>>            } finally {
>>>>                queueLock.unlock();
>>>>            }
>>>>        }
>>>>    }
>>>> 
>>>> }
>>>> 
>>>> Also please confirm which codebase you're running against.  Latest HEAD of master?
>>>> 
>>>> I'm using a snap from GitHub that's several weeks old from August 
>>>> 25th (it's working fine with the original GetFile processor, which 
>>>> this code was derived from)
>>>> 
>>>> Thanks
>>>> Joe
>>>> 
>>>> On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
>>>>> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
>>>>> 
>>>>> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
>>>>> 
>>>>> Not being intimate with the internals of the framework yet, not sure what would case this.
>>>>> 
>>>>> Rick
>>>>> 
>>>>> -----Original Message-----
>>>>> From: Rick Braddy
>>>>> Sent: Friday, September 11, 2015 8:26 PM
>>>>> To: dev@nifi.apache.org
>>>>> Subject: RE: Transfer relationship not specified
>>>>> (FlowFileHandlingException)
>>>>> 
>>>>> Mark,
>>>>> 
>>>>> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
>>>>> 
>>>>> Rick
>>>>> 
>>>>>       final ListIterator<File> itr = files.listIterator();
>>>>>        FlowFile flowFile = null;
>>>>>        try {
>>>>>            while (itr.hasNext()) {
>>>>>                final File file = itr.next();
>>>>>                final Path filePath = file.toPath();
>>>>>                final Path relativePath = filePath.relativize(filePath.getParent());
>>>>>                String relativePathString = relativePath.toString() + "/";
>>>>>                if (relativePathString.isEmpty()) {
>>>>>                    relativePathString = "./";
>>>>>                }
>>>>>                final Path absPath = filePath.toAbsolutePath();
>>>>>                final String absPathString =
>>>>> absPath.getParent().toString() + "/";
>>>>> 
>>>>>                final long importStart = System.nanoTime();
>>>>>                String fileType = "directory";
>>>>>                flowFile = session.create();
>>>>>                if (file.isFile()){
>>>>>                    fileType = "file";
>>>>>                    flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>>>>>                }
>>>>>                final long importNanos = System.nanoTime() - importStart;
>>>>>                final long importMillis = 
>>>>> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>>>>> 
>>>>>                flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>>>>>                flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>>>>>                flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>>>>>                flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>>>>>                Map<String, String> attributes = getAttributesFromFile(filePath);
>>>>>                if (attributes.size() > 0) {
>>>>>                    flowFile = session.putAllAttributes(flowFile, attributes);
>>>>>                }
>>>>> 
>>>>>                session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
>>>>>                session.transfer(flowFile, REL_SUCCESS);
>>>>>                logger.info("added {} to flow", new 
>>>>> Object[]{flowFile});
>>>>> 
>>>>>                if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>>>>>                    queueLock.lock();
>>>>>                    try {
>>>>>                        while (itr.hasNext()) {
>>>>>                            final File nextFile = itr.next();
>>>>>                            fileQueue.add(nextFile);
>>>>>                            inProcess.remove(nextFile);
>>>>>                        }
>>>>>                    } finally {
>>>>>                        queueLock.unlock();
>>>>>                    }
>>>>>                }
>>>>>            }
>>>>>            session.commit();
>>>>>        } catch (final Exception e) {
>>>>>            logger.error("Failed to transfer files due to {}", 
>>>>> e);
>>>>> 
>>>>> -----Original Message-----
>>>>> From: Mark Payne [mailto:markap14@hotmail.com]
>>>>> Sent: Friday, September 11, 2015 6:39 PM
>>>>> To: dev@nifi.apache.org
>>>>> Subject: RE: Transfer relationship not specified
>>>>> (FlowFileHandlingException)
>>>>> 
>>>>> Rick,
>>>>> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
>>>>> I.e., there was never a call to session.transfer() for that FlowFile.
>>>>> Thanks-Mark
>>>>> 
>>>>>> From: rbraddy@softnas.com
>>>>>> To: dev@nifi.apache.org
>>>>>> Subject: RE: Transfer relationship not specified
>>>>>> (FlowFileHandlingException)
>>>>>> Date: Fri, 11 Sep 2015 23:25:33 +0000
>>>>>> 
>>>>>> Some more details:
>>>>>> 
>>>>>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
>>>>>> c.s.c.processors.files.GetFileData
>>>>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to 
>>>>>> process session due to
>>>>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515
>>>>>> ,c
>>>>>> la
>>>>>> i
>>>>>> m
>>>>>> =org.apache.nifi.controller.repository.claim.StandardContentClai
>>>>>> m@
>>>>>> fe
>>>>>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in 
>>>>>> this session
>>>>>> (StandardProcessSession[id=6967]):
>>>>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515
>>>>>> ,c
>>>>>> la
>>>>>> i
>>>>>> m
>>>>>> =org.apache.nifi.controller.repository.claim.StandardContentClai
>>>>>> m@
>>>>>> fe
>>>>>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in 
>>>>>> this session
>>>>>> (StandardProcessSession[id=6967])
>>>>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>>>>>> c.s.c.processors.files.GetFileData
>>>>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added
>>>>>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444
>>>>>> ,c
>>>>>> la
>>>>>> i
>>>>>> m
>>>>>> =org.apache.nifi.controller.repository.claim.StandardContentClai
>>>>>> m@
>>>>>> ff
>>>>>> 0 c ad6b,offset=0,name=anImage.png,size=16418] to flow
>>>>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>>>>>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
>>>>>> 
>>>>>> So it seems there's some issue with each of the FlowFiles...
>>>>>> 
>>>>>> -----Original Message-----
>>>>>> From: Rick Braddy [mailto:rbraddy@softnas.com]
>>>>>> Sent: Friday, September 11, 2015 6:00 PM
>>>>>> To: dev@nifi.apache.org
>>>>>> Subject: Transfer relationship not specified
>>>>>> (FlowFileHandlingException)
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
>>>>>> 
>>>>>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
>>>>>> c.s.c.processors.files.GetFileData
>>>>>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
>>>>>> retrieve files due to {}
>>>>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31
>>>>>> ,c
>>>>>> la
>>>>>> i
>>>>>> m
>>>>>> =org.apache.nifi.controller.repository.claim.StandardContentClai
>>>>>> m@
>>>>>> 93
>>>>>> 9
>>>>>> 6 4c66,offset=244,name=225120878343804,size=42] transfer 
>>>>>> relationship not specified
>>>>>> 
>>>>>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
>>>>>> 
>>>>>> Any suggestions for troubleshooting?
>>>>>> 
>>>>>> Rick
>>>>>> 
>>>>>> 
>>>>> 
>> 		 	   		  
> 		 	   		  


RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Rick Braddy <rb...@softnas.com>.
All,

Thank you very much for your help on this.  My processor is now working perfectly.  The key was calling session.remove() after reading each inbound flowfile.

I certainly learned a lot from this experience and appreciate all the guidance and training.  Hopefully this thread will help others in the future.

I agree with Joe's earlier comment - the error message wasn't especially helpful.  If the error message had printed out the offending record it may have been more obvious that the issue was related to inbound flowfile/relationship instead of outbound.  The solution had nothing to do with "transfer", so the current message was even further misleading.

What might help in the future would be an inventory of Nifi exceptions, along with an explanation of likely causes and recommendations to resolve.  I realize this is a lot of work to document, but as the project matures, this kind of thing, along with an API reference guide would be very helpful for newbie developers.

Thanks again!

On my way again...

Rick

-----Original Message-----
From: Rick Braddy [mailto:rbraddy@softnas.com] 
Sent: Saturday, September 12, 2015 3:31 PM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

Mark,

Okay. That helps a lot!!

The thing is, like the original GetFile, the new processor can read and then delete the original file (it's still an option in case the use-case is to "migrate" the files instead of "replicate" them).

So given that twist, it sounds like session.remove() may make the most sense, similar to the original GetFile processor.

Thank you very much for this succinct explanation.  Not sure why I couldn't figure those things out from reading the Developer Guide - perhaps some examples or similar dialog to the below would help others.

Thanks!
Rick

-----Original Message-----
From: Mark Payne [mailto:markap14@hotmail.com]
Sent: Saturday, September 12, 2015 3:26 PM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

Rick,
Some great questions!
1. If session.remove() is called, and then the session is rolled back, the original FlowFile will be requeued so that it can be re-processed.
2. If you call session.importFrom, it will overwrite the contents of the FlowFile. However, this is done in such a way that if you overwrite itand then call session.rollback, again the original FlowFile will be re-queued, not the modified version.
3. re: @SupportsBatching. There's not necessarily a lot of overhead in calling onTrigger() repeated, but rather the overhead lives within session.commit().When you use the @SupportsBatching annotation, and then configure the Processor to batch, for say 25 ms (in the Scheduling Tab of the Processor Configuration dialog),what you are doing is telling the framework that calls to session.commit() can be done asynchronously, essentially, so that we can wait up to 25 milliseconds and then performa single update to the FlowFile and Provenance Repositories that includes all of the information for those sessions. So it is the session commit that is batched up.
GetFile doesn't use this approach because if it did this, it could pull in a FlowFile, call session.commit (which may not actually update the repositories if the updates are batched),and then deletes the original file. At this point, if the node died, it would have lost that session commit (which means that the file will not be in the flow on restart) AND it willhave already deleted the file from disk.
So in order to avoid that, it does not support batching. So when we call session.commit(), we know that the repositories are updated and the FlowFile is safe within NiFi. So nowthat NiFi is responsible for handling the file, we can delete the original.
In your case, you can support batching because you're not deleting the origin. So if you haven't yet committed the session and the node dies, it's okay, you can just re-processthe file.
Does that make sense?  If not, please let me know because this isn't an easy concept necessarily to communicate, but it needs to be communicated well.
Thanks-Mark

> From: rbraddy@softnas.com
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified
> (FlowFileHandlingException)
> Date: Sat, 12 Sep 2015 20:12:35 +0000
> 
> Mark,
> 
> That's exactly the advice and guidance that I needed.  A couple of questions for #2 and #3 cases.
> 
> 2. Call session.remove() instead of session.transfer(). This will remove the incoming FlowFile from the flow. Given thedescription that you have provided, that means you will always create a single child FlowFile and then remove the parent. That would work, but it seems a bit confusing.
> 
> >> when session.remove() is called and a session.rollback() occurs due to an error, does the remove operation also get rolled back so the original inbound flowfiles are available to be reprocessed?
> 
> 3. The option that I think makes the most sense for this use case. Do not create a new FlowFile at all. Instead, get a FlowFilevia session.get() and then modify the contents of that FlowFile by calling session.import with the original FlowFile.
> 
> >> I agree this makes a lot of sense and would simplify things 
> >> greatly.  In this case, would session.importFrom overwrite the 
> >> contents of the original inbound flowfile or append to it?  (need a 
> >> way to replace the file path with the file's contents)
> 
> >> Is there a significant amount of context-switch overhead in onTrigger() being called repeatedly vs. batching/queueing?  Is this effectively what @SupportBatching does, to make multiple immediate calls back to onTrigger() before yielding to other threads? 
> 
> >> Why does the GetFile processor use the internal batching/queuing approach?
> 
> Thank you.
> Rick
> 
> -----Original Message-----
> From: Mark Payne [mailto:markap14@hotmail.com]
> Sent: Saturday, September 12, 2015 2:59 PM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified
> (FlowFileHandlingException)
> 
> Rick,
> You must account for all FlowFiles that you pull in from queues. You can account for a FlowFile by eitherremoving it or transferring it. In terms of what to do with the incoming FlowFiles, you have three options, really:
> 1. Transfer to an 'original' relationship. If you think this will be an "unused" relationship, then this is probablynot a good option for you. I would never recommend creating a relationship that you don't believe will be useful.
> 2. Call session.remove() instead of session.transfer(). This will remove the incoming FlowFile from the flow. Given thedescription that you have provided, that means you will always create a single child FlowFile and then remove the parent.That would work, but it seems a bit confusing.
> 3. The option that I think makes the most sense for this use case. Do not create a new FlowFile at all. Instead, get a FlowFilevia session.get() and then modify the contents of that FlowFile by calling session.import with the original FlowFile. This willthen modify the content of the FlowFile, rather than creating a child. The FlowFile would then just be routed to success.
> Regarding batching: generally for this type of Processor, I would recommend you do not pull in 10 FlowFiles at a time and keepa collection of them. Rather, I would pull in a single FlowFile and process it. I would then use the @SupportsBatching annotation.This allows the operator to make the trade-off of throughput vs. latency as appropriate for the particular flow. This approach alsoallows the Processor's code to be a little simpler, as it doesn't need to maintain Collections of FlowFiles - it just operates on a singleFlowFile at a time.
> Hope this helps! Let us know if you have any further questions or issues.
> Thanks-Mark
> 
> > From: rbraddy@softnas.com
> > To: dev@nifi.apache.org
> > Subject: RE: Transfer relationship not specified
> > (FlowFileHandlingException)
> > Date: Sat, 12 Sep 2015 18:16:48 +0000
> > 
> > I went back and re-read the Developer's Guide to try and understand how incoming FlowFiles should be handled relative to session transactions.
> > 
> > Currently, my processor uses session.get() to pull a batch of up to 10 incoming flowfiles and places them into an internal queue.  It never calls session.transfer() for these inbound flowfiles.  It just reads the contents from each one using session.read() and processes that input.  Each of these flowfiles contains a directory or filename of a file.
> > 
> > Then it creates up to 10 new outbound flowfiles as a batch, using session.importFrom() and session.transfer() for each one, then finally calls session.commit().  It is at this point that the framework raises an exception FlowFileHandlingException (transfer relationship not specified) and fails.  Based upon debugging, it appears the exception is raised against the first flowfile record that was recorded - the first incoming flowfile, for which session.transfer() was never called.
> > 
> > Questions:
> > 
> > 1. Is it required to "dispose" of incoming flowfiles that have been accessed via session.get() using session.transfer()?
> > 
> > 2. If so, should the incoming flowfiles be routed to an unused relationship, such as "ORIGINAL" like the SplitText processor does?
> > 
> > 3. I read about the "SupportsBatching" attribute, which is not set on my processor (or the original GetFile processor, which does not read from input queues).  Given I am reading and writing flowfiles in batches, should this attribute be set?
> > 
> > Thanks
> > Rick
> > 
> > -----Original Message-----
> > From: Rick Braddy [mailto:rbraddy@softnas.com]
> > Sent: Saturday, September 12, 2015 8:50 AM
> > To: dev@nifi.apache.org
> > Subject: RE: Transfer relationship not specified
> > (FlowFileHandlingException)
> > 
> > More information...Upon further debugging with in the StandardProcessSession : checkpoint() method, it's clear that there are 16 items in "record" list - 10 are incoming flowfiles that were read and processes, and 6 are the newly-created outbound flows.
> > 
> > The for loop is throwing the exception on the very first record, which is actually one of the 10 inbound flowfiles that has already been processed:
> > 
> >        //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
> >         final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
> >         for (final StandardRepositoryRecord record : records.values()) {
> >             if (record.isMarkedForDelete()) {
> >                 continue;
> >             }
> >             final Relationship relationship = record.getTransferRelationship();
> >          if (relationship == null) {
> >                 rollback();
> >  -->          throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship not specified");
> >             }
> > 
> > I modified the standard GetFile processor to accept incoming flowfiles that contain a file path per flowfile, so GetFileData (the new processor) can be triggered to process specific files.  I did NOT define a specific incoming relationship and just assumed there is one already available by default.  If there is not, that may be the problem.  There is clearly an inbound relationship established, as the inbound flowfiles are being read and processed just fine, but it seems that commit() calling checkpoint() doesn't like what it's seeing overall.
> > 
> > Rick
> > 
> > -----Original Message-----
> > From: Rick Braddy [mailto:rbraddy@softnas.com]
> > Sent: Saturday, September 12, 2015 7:42 AM
> > To: dev@nifi.apache.org
> > Subject: RE: Transfer relationship not specified
> > (FlowFileHandlingException)
> > 
> > Ok.  Thanks Joe.  The files I'm using are simple .c text and .png image files for testing.
> > 
> > Rick
> > 
> > -----Original Message-----
> > From: Joe Witt [mailto:joe.witt@gmail.com]
> > Sent: Saturday, September 12, 2015 7:41 AM
> > To: dev@nifi.apache.org
> > Subject: Re: Transfer relationship not specified
> > (FlowFileHandlingException)
> > 
> > Yep.  Looks legit to me.  Will try a unit test with a mixture of flowFiles associated with content and without.
> > 
> > On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rb...@softnas.com> wrote:
> > > Joe,
> > >
> > > Replies below.
> > >
> > > Rick
> > >
> > >
> > > -----Original Message-----
> > > From: Joe Witt [mailto:joe.witt@gmail.com]
> > > Sent: Saturday, September 12, 2015 7:02 AM
> > > To: dev@nifi.apache.org
> > > Subject: Re: Transfer relationship not specified
> > > (FlowFileHandlingException)
> > >
> > > Rick
> > >
> > > Can you show what is happening in the exception handling part of your code as well?
> > >
> > > Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help).
> > >
> > > Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer().
> > >
> > > Here's the entire onTrigger method:
> > >
> > >     @Override
> > >     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
> > >         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
> > >         final ProcessorLog logger = getLogger();
> > >
> > >         final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
> > >         if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
> > >             try {
> > >                 final Set<File> filelist = getFileList(context, 
> > > session);
> > >
> > >                 queueLock.lock();
> > >                 try {
> > >                     filelist.removeAll(inProcess);
> > >                     if (!keepingSourceFile) {
> > >                         filelist.removeAll(recentlyProcessed);
> > >                     }
> > >
> > >                     fileQueue.clear();
> > >                     fileQueue.addAll(filelist);
> > >
> > >                     queueLastUpdated.set(System.currentTimeMillis());
> > >                     recentlyProcessed.clear();
> > >
> > >                     if (filelist.isEmpty()) {
> > >                         context.yield();
> > >                     }
> > >                 } finally {
> > >                     queueLock.unlock();
> > >                 }
> > >             } finally {
> > >                 filelistLock.unlock();
> > >             }
> > >         }
> > >
> > >         final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
> > >         final List<File> files = new ArrayList<>(batchSize);
> > >         queueLock.lock();
> > >         try {
> > >             fileQueue.drainTo(files, batchSize);
> > >             if (files.isEmpty()) {
> > >                 return;
> > >             } else {
> > >                 inProcess.addAll(files);
> > >             }
> > >         } finally {
> > >             queueLock.unlock();
> > >         }
> > >
> > >         final ListIterator<File> itr = files.listIterator();
> > >         FlowFile flowFile = null;
> > >         try {
> > >             while (itr.hasNext()) {
> > >                 final File file = itr.next();
> > >                 final Path filePath = file.toPath();
> > >                 final Path relativePath = filePath.relativize(filePath.getParent());
> > >                 String relativePathString = relativePath.toString() + "/";
> > >                 if (relativePathString.isEmpty()) {
> > >                     relativePathString = "./";
> > >                 }
> > >                 final Path absPath = filePath.toAbsolutePath();
> > >                 final String absPathString =
> > > absPath.getParent().toString() + "/";
> > >
> > >                 final long importStart = System.nanoTime();
> > >                 String fileType = "directory";
> > >                 if (file.isFile()){
> > >                     fileType = "file";
> > >                     flowFile = session.create();
> > >                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
> > >                 }
> > >                 else
> > >                 {
> > >                    logger.info("skipping directory {} and not placing into output flow", new Object[]{file});
> > >                    continue; // ******* SKIP DIRECTORIES FOR NOW ****
> > >                 }
> > >
> > >                 final long importNanos = System.nanoTime() - importStart;
> > >                 final long importMillis = 
> > > TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
> > >
> > >               //  flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
> > >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
> > >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
> > >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
> > >               //  Map<String, String> attributes = getAttributesFromFile(filePath);
> > >               //  if (attributes.size() > 0) {
> > >               //      flowFile = session.putAllAttributes(flowFile, attributes);
> > >               //  }
> > >
> > >                 final String fileURI = file.toURI().toString();
> > >                 session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
> > >                 session.transfer(flowFile, REL_SUCCESS);
> > >                 logger.info("added {} to flow", new 
> > > Object[]{flowFile});
> > >
> > >                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
> > >                     queueLock.lock();
> > >                     try {
> > >                         while (itr.hasNext()) {
> > >                             final File nextFile = itr.next();
> > >                             fileQueue.add(nextFile);
> > >                             inProcess.remove(nextFile);
> > >                         }
> > >                     } finally {
> > >                         queueLock.unlock();
> > >                     }
> > >                 }
> > >             }
> > >             session.commit();
> > >         } catch (final Exception e) {
> > >             logger.error("Failed to transfer files due to {}", e);
> > >             context.yield();
> > >
> > >
> > >             // anything that we've not already processed needs to be put back on the queue
> > >             if (flowFile != null) {
> > >                 session.remove(flowFile);
> > >             }
> > >         } finally {
> > >             queueLock.lock();
> > >             try {
> > >                 inProcess.removeAll(files);
> > >                 recentlyProcessed.addAll(files);
> > >             } finally {
> > >                 queueLock.unlock();
> > >             }
> > >         }
> > >     }
> > >
> > > }
> > >
> > > Also please confirm which codebase you're running against.  Latest HEAD of master?
> > >
> > > I'm using a snap from GitHub that's several weeks old from August 
> > > 25th (it's working fine with the original GetFile processor, which 
> > > this code was derived from)
> > >
> > > Thanks
> > > Joe
> > >
> > > On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
> > >> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
> > >>
> > >> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
> > >>
> > >> Not being intimate with the internals of the framework yet, not sure what would case this.
> > >>
> > >> Rick
> > >>
> > >> -----Original Message-----
> > >> From: Rick Braddy
> > >> Sent: Friday, September 11, 2015 8:26 PM
> > >> To: dev@nifi.apache.org
> > >> Subject: RE: Transfer relationship not specified
> > >> (FlowFileHandlingException)
> > >>
> > >> Mark,
> > >>
> > >> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
> > >>
> > >> Rick
> > >>
> > >>        final ListIterator<File> itr = files.listIterator();
> > >>         FlowFile flowFile = null;
> > >>         try {
> > >>             while (itr.hasNext()) {
> > >>                 final File file = itr.next();
> > >>                 final Path filePath = file.toPath();
> > >>                 final Path relativePath = filePath.relativize(filePath.getParent());
> > >>                 String relativePathString = relativePath.toString() + "/";
> > >>                 if (relativePathString.isEmpty()) {
> > >>                     relativePathString = "./";
> > >>                 }
> > >>                 final Path absPath = filePath.toAbsolutePath();
> > >>                 final String absPathString =
> > >> absPath.getParent().toString() + "/";
> > >>
> > >>                 final long importStart = System.nanoTime();
> > >>                 String fileType = "directory";
> > >>                 flowFile = session.create();
> > >>                 if (file.isFile()){
> > >>                     fileType = "file";
> > >>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
> > >>                 }
> > >>                 final long importNanos = System.nanoTime() - importStart;
> > >>                 final long importMillis = 
> > >> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
> > >>
> > >>                 flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
> > >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
> > >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
> > >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
> > >>                 Map<String, String> attributes = getAttributesFromFile(filePath);
> > >>                 if (attributes.size() > 0) {
> > >>                     flowFile = session.putAllAttributes(flowFile, attributes);
> > >>                 }
> > >>
> > >>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
> > >>                 session.transfer(flowFile, REL_SUCCESS);
> > >>                 logger.info("added {} to flow", new 
> > >> Object[]{flowFile});
> > >>
> > >>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
> > >>                     queueLock.lock();
> > >>                     try {
> > >>                         while (itr.hasNext()) {
> > >>                             final File nextFile = itr.next();
> > >>                             fileQueue.add(nextFile);
> > >>                             inProcess.remove(nextFile);
> > >>                         }
> > >>                     } finally {
> > >>                         queueLock.unlock();
> > >>                     }
> > >>                 }
> > >>             }
> > >>             session.commit();
> > >>         } catch (final Exception e) {
> > >>             logger.error("Failed to transfer files due to {}", 
> > >> e);
> > >>
> > >> -----Original Message-----
> > >> From: Mark Payne [mailto:markap14@hotmail.com]
> > >> Sent: Friday, September 11, 2015 6:39 PM
> > >> To: dev@nifi.apache.org
> > >> Subject: RE: Transfer relationship not specified
> > >> (FlowFileHandlingException)
> > >>
> > >> Rick,
> > >> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
> > >> I.e., there was never a call to session.transfer() for that FlowFile.
> > >> Thanks-Mark
> > >>
> > >>> From: rbraddy@softnas.com
> > >>> To: dev@nifi.apache.org
> > >>> Subject: RE: Transfer relationship not specified
> > >>> (FlowFileHandlingException)
> > >>> Date: Fri, 11 Sep 2015 23:25:33 +0000
> > >>>
> > >>> Some more details:
> > >>>
> > >>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
> > >>> c.s.c.processors.files.GetFileData
> > >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to 
> > >>> process session due to
> > >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> > >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515
> > >>> ,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClai
> > >>> m@
> > >>> fe
> > >>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in 
> > >>> this session
> > >>> (StandardProcessSession[id=6967]):
> > >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> > >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515
> > >>> ,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClai
> > >>> m@
> > >>> fe
> > >>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in 
> > >>> this session
> > >>> (StandardProcessSession[id=6967])
> > >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> > >>> c.s.c.processors.files.GetFileData
> > >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added
> > >>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444
> > >>> ,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClai
> > >>> m@
> > >>> ff
> > >>> 0 c ad6b,offset=0,name=anImage.png,size=16418] to flow
> > >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> > >>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
> > >>>
> > >>> So it seems there's some issue with each of the FlowFiles...
> > >>>
> > >>> -----Original Message-----
> > >>> From: Rick Braddy [mailto:rbraddy@softnas.com]
> > >>> Sent: Friday, September 11, 2015 6:00 PM
> > >>> To: dev@nifi.apache.org
> > >>> Subject: Transfer relationship not specified
> > >>> (FlowFileHandlingException)
> > >>>
> > >>> Hi,
> > >>>
> > >>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
> > >>>
> > >>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
> > >>> c.s.c.processors.files.GetFileData
> > >>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
> > >>> retrieve files due to {}
> > >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> > >>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31
> > >>> ,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClai
> > >>> m@
> > >>> 93
> > >>> 9
> > >>> 6 4c66,offset=244,name=225120878343804,size=42] transfer 
> > >>> relationship not specified
> > >>>
> > >>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
> > >>>
> > >>> Any suggestions for troubleshooting?
> > >>>
> > >>> Rick
> > >>>
> > >>>
> > >>
>  		 	   		  
 		 	   		  

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Rick Braddy <rb...@softnas.com>.
Mark,

Okay. That helps a lot!!

The thing is, like the original GetFile, the new processor can read and then delete the original file (it's still an option in case the use-case is to "migrate" the files instead of "replicate" them).

So given that twist, it sounds like session.remove() may make the most sense, similar to the original GetFile processor.

Thank you very much for this succinct explanation.  Not sure why I couldn't figure those things out from reading the Developer Guide - perhaps some examples or similar dialog to the below would help others.

Thanks!
Rick

-----Original Message-----
From: Mark Payne [mailto:markap14@hotmail.com] 
Sent: Saturday, September 12, 2015 3:26 PM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

Rick,
Some great questions!
1. If session.remove() is called, and then the session is rolled back, the original FlowFile will be requeued so that it can be re-processed.
2. If you call session.importFrom, it will overwrite the contents of the FlowFile. However, this is done in such a way that if you overwrite itand then call session.rollback, again the original FlowFile will be re-queued, not the modified version.
3. re: @SupportsBatching. There's not necessarily a lot of overhead in calling onTrigger() repeated, but rather the overhead lives within session.commit().When you use the @SupportsBatching annotation, and then configure the Processor to batch, for say 25 ms (in the Scheduling Tab of the Processor Configuration dialog),what you are doing is telling the framework that calls to session.commit() can be done asynchronously, essentially, so that we can wait up to 25 milliseconds and then performa single update to the FlowFile and Provenance Repositories that includes all of the information for those sessions. So it is the session commit that is batched up.
GetFile doesn't use this approach because if it did this, it could pull in a FlowFile, call session.commit (which may not actually update the repositories if the updates are batched),and then deletes the original file. At this point, if the node died, it would have lost that session commit (which means that the file will not be in the flow on restart) AND it willhave already deleted the file from disk.
So in order to avoid that, it does not support batching. So when we call session.commit(), we know that the repositories are updated and the FlowFile is safe within NiFi. So nowthat NiFi is responsible for handling the file, we can delete the original.
In your case, you can support batching because you're not deleting the origin. So if you haven't yet committed the session and the node dies, it's okay, you can just re-processthe file.
Does that make sense?  If not, please let me know because this isn't an easy concept necessarily to communicate, but it needs to be communicated well.
Thanks-Mark

> From: rbraddy@softnas.com
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified 
> (FlowFileHandlingException)
> Date: Sat, 12 Sep 2015 20:12:35 +0000
> 
> Mark,
> 
> That's exactly the advice and guidance that I needed.  A couple of questions for #2 and #3 cases.
> 
> 2. Call session.remove() instead of session.transfer(). This will remove the incoming FlowFile from the flow. Given thedescription that you have provided, that means you will always create a single child FlowFile and then remove the parent. That would work, but it seems a bit confusing.
> 
> >> when session.remove() is called and a session.rollback() occurs due to an error, does the remove operation also get rolled back so the original inbound flowfiles are available to be reprocessed?
> 
> 3. The option that I think makes the most sense for this use case. Do not create a new FlowFile at all. Instead, get a FlowFilevia session.get() and then modify the contents of that FlowFile by calling session.import with the original FlowFile.
> 
> >> I agree this makes a lot of sense and would simplify things 
> >> greatly.  In this case, would session.importFrom overwrite the 
> >> contents of the original inbound flowfile or append to it?  (need a 
> >> way to replace the file path with the file's contents)
> 
> >> Is there a significant amount of context-switch overhead in onTrigger() being called repeatedly vs. batching/queueing?  Is this effectively what @SupportBatching does, to make multiple immediate calls back to onTrigger() before yielding to other threads? 
> 
> >> Why does the GetFile processor use the internal batching/queuing approach?
> 
> Thank you.
> Rick
> 
> -----Original Message-----
> From: Mark Payne [mailto:markap14@hotmail.com]
> Sent: Saturday, September 12, 2015 2:59 PM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified 
> (FlowFileHandlingException)
> 
> Rick,
> You must account for all FlowFiles that you pull in from queues. You can account for a FlowFile by eitherremoving it or transferring it. In terms of what to do with the incoming FlowFiles, you have three options, really:
> 1. Transfer to an 'original' relationship. If you think this will be an "unused" relationship, then this is probablynot a good option for you. I would never recommend creating a relationship that you don't believe will be useful.
> 2. Call session.remove() instead of session.transfer(). This will remove the incoming FlowFile from the flow. Given thedescription that you have provided, that means you will always create a single child FlowFile and then remove the parent.That would work, but it seems a bit confusing.
> 3. The option that I think makes the most sense for this use case. Do not create a new FlowFile at all. Instead, get a FlowFilevia session.get() and then modify the contents of that FlowFile by calling session.import with the original FlowFile. This willthen modify the content of the FlowFile, rather than creating a child. The FlowFile would then just be routed to success.
> Regarding batching: generally for this type of Processor, I would recommend you do not pull in 10 FlowFiles at a time and keepa collection of them. Rather, I would pull in a single FlowFile and process it. I would then use the @SupportsBatching annotation.This allows the operator to make the trade-off of throughput vs. latency as appropriate for the particular flow. This approach alsoallows the Processor's code to be a little simpler, as it doesn't need to maintain Collections of FlowFiles - it just operates on a singleFlowFile at a time.
> Hope this helps! Let us know if you have any further questions or issues.
> Thanks-Mark
> 
> > From: rbraddy@softnas.com
> > To: dev@nifi.apache.org
> > Subject: RE: Transfer relationship not specified
> > (FlowFileHandlingException)
> > Date: Sat, 12 Sep 2015 18:16:48 +0000
> > 
> > I went back and re-read the Developer's Guide to try and understand how incoming FlowFiles should be handled relative to session transactions.
> > 
> > Currently, my processor uses session.get() to pull a batch of up to 10 incoming flowfiles and places them into an internal queue.  It never calls session.transfer() for these inbound flowfiles.  It just reads the contents from each one using session.read() and processes that input.  Each of these flowfiles contains a directory or filename of a file.
> > 
> > Then it creates up to 10 new outbound flowfiles as a batch, using session.importFrom() and session.transfer() for each one, then finally calls session.commit().  It is at this point that the framework raises an exception FlowFileHandlingException (transfer relationship not specified) and fails.  Based upon debugging, it appears the exception is raised against the first flowfile record that was recorded - the first incoming flowfile, for which session.transfer() was never called.
> > 
> > Questions:
> > 
> > 1. Is it required to "dispose" of incoming flowfiles that have been accessed via session.get() using session.transfer()?
> > 
> > 2. If so, should the incoming flowfiles be routed to an unused relationship, such as "ORIGINAL" like the SplitText processor does?
> > 
> > 3. I read about the "SupportsBatching" attribute, which is not set on my processor (or the original GetFile processor, which does not read from input queues).  Given I am reading and writing flowfiles in batches, should this attribute be set?
> > 
> > Thanks
> > Rick
> > 
> > -----Original Message-----
> > From: Rick Braddy [mailto:rbraddy@softnas.com]
> > Sent: Saturday, September 12, 2015 8:50 AM
> > To: dev@nifi.apache.org
> > Subject: RE: Transfer relationship not specified
> > (FlowFileHandlingException)
> > 
> > More information...Upon further debugging with in the StandardProcessSession : checkpoint() method, it's clear that there are 16 items in "record" list - 10 are incoming flowfiles that were read and processes, and 6 are the newly-created outbound flows.
> > 
> > The for loop is throwing the exception on the very first record, which is actually one of the 10 inbound flowfiles that has already been processed:
> > 
> >        //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
> >         final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
> >         for (final StandardRepositoryRecord record : records.values()) {
> >             if (record.isMarkedForDelete()) {
> >                 continue;
> >             }
> >             final Relationship relationship = record.getTransferRelationship();
> >          if (relationship == null) {
> >                 rollback();
> >  -->          throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship not specified");
> >             }
> > 
> > I modified the standard GetFile processor to accept incoming flowfiles that contain a file path per flowfile, so GetFileData (the new processor) can be triggered to process specific files.  I did NOT define a specific incoming relationship and just assumed there is one already available by default.  If there is not, that may be the problem.  There is clearly an inbound relationship established, as the inbound flowfiles are being read and processed just fine, but it seems that commit() calling checkpoint() doesn't like what it's seeing overall.
> > 
> > Rick
> > 
> > -----Original Message-----
> > From: Rick Braddy [mailto:rbraddy@softnas.com]
> > Sent: Saturday, September 12, 2015 7:42 AM
> > To: dev@nifi.apache.org
> > Subject: RE: Transfer relationship not specified
> > (FlowFileHandlingException)
> > 
> > Ok.  Thanks Joe.  The files I'm using are simple .c text and .png image files for testing.
> > 
> > Rick
> > 
> > -----Original Message-----
> > From: Joe Witt [mailto:joe.witt@gmail.com]
> > Sent: Saturday, September 12, 2015 7:41 AM
> > To: dev@nifi.apache.org
> > Subject: Re: Transfer relationship not specified
> > (FlowFileHandlingException)
> > 
> > Yep.  Looks legit to me.  Will try a unit test with a mixture of flowFiles associated with content and without.
> > 
> > On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rb...@softnas.com> wrote:
> > > Joe,
> > >
> > > Replies below.
> > >
> > > Rick
> > >
> > >
> > > -----Original Message-----
> > > From: Joe Witt [mailto:joe.witt@gmail.com]
> > > Sent: Saturday, September 12, 2015 7:02 AM
> > > To: dev@nifi.apache.org
> > > Subject: Re: Transfer relationship not specified
> > > (FlowFileHandlingException)
> > >
> > > Rick
> > >
> > > Can you show what is happening in the exception handling part of your code as well?
> > >
> > > Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help).
> > >
> > > Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer().
> > >
> > > Here's the entire onTrigger method:
> > >
> > >     @Override
> > >     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
> > >         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
> > >         final ProcessorLog logger = getLogger();
> > >
> > >         final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
> > >         if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
> > >             try {
> > >                 final Set<File> filelist = getFileList(context, 
> > > session);
> > >
> > >                 queueLock.lock();
> > >                 try {
> > >                     filelist.removeAll(inProcess);
> > >                     if (!keepingSourceFile) {
> > >                         filelist.removeAll(recentlyProcessed);
> > >                     }
> > >
> > >                     fileQueue.clear();
> > >                     fileQueue.addAll(filelist);
> > >
> > >                     queueLastUpdated.set(System.currentTimeMillis());
> > >                     recentlyProcessed.clear();
> > >
> > >                     if (filelist.isEmpty()) {
> > >                         context.yield();
> > >                     }
> > >                 } finally {
> > >                     queueLock.unlock();
> > >                 }
> > >             } finally {
> > >                 filelistLock.unlock();
> > >             }
> > >         }
> > >
> > >         final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
> > >         final List<File> files = new ArrayList<>(batchSize);
> > >         queueLock.lock();
> > >         try {
> > >             fileQueue.drainTo(files, batchSize);
> > >             if (files.isEmpty()) {
> > >                 return;
> > >             } else {
> > >                 inProcess.addAll(files);
> > >             }
> > >         } finally {
> > >             queueLock.unlock();
> > >         }
> > >
> > >         final ListIterator<File> itr = files.listIterator();
> > >         FlowFile flowFile = null;
> > >         try {
> > >             while (itr.hasNext()) {
> > >                 final File file = itr.next();
> > >                 final Path filePath = file.toPath();
> > >                 final Path relativePath = filePath.relativize(filePath.getParent());
> > >                 String relativePathString = relativePath.toString() + "/";
> > >                 if (relativePathString.isEmpty()) {
> > >                     relativePathString = "./";
> > >                 }
> > >                 final Path absPath = filePath.toAbsolutePath();
> > >                 final String absPathString =
> > > absPath.getParent().toString() + "/";
> > >
> > >                 final long importStart = System.nanoTime();
> > >                 String fileType = "directory";
> > >                 if (file.isFile()){
> > >                     fileType = "file";
> > >                     flowFile = session.create();
> > >                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
> > >                 }
> > >                 else
> > >                 {
> > >                    logger.info("skipping directory {} and not placing into output flow", new Object[]{file});
> > >                    continue; // ******* SKIP DIRECTORIES FOR NOW ****
> > >                 }
> > >
> > >                 final long importNanos = System.nanoTime() - importStart;
> > >                 final long importMillis = 
> > > TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
> > >
> > >               //  flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
> > >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
> > >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
> > >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
> > >               //  Map<String, String> attributes = getAttributesFromFile(filePath);
> > >               //  if (attributes.size() > 0) {
> > >               //      flowFile = session.putAllAttributes(flowFile, attributes);
> > >               //  }
> > >
> > >                 final String fileURI = file.toURI().toString();
> > >                 session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
> > >                 session.transfer(flowFile, REL_SUCCESS);
> > >                 logger.info("added {} to flow", new 
> > > Object[]{flowFile});
> > >
> > >                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
> > >                     queueLock.lock();
> > >                     try {
> > >                         while (itr.hasNext()) {
> > >                             final File nextFile = itr.next();
> > >                             fileQueue.add(nextFile);
> > >                             inProcess.remove(nextFile);
> > >                         }
> > >                     } finally {
> > >                         queueLock.unlock();
> > >                     }
> > >                 }
> > >             }
> > >             session.commit();
> > >         } catch (final Exception e) {
> > >             logger.error("Failed to transfer files due to {}", e);
> > >             context.yield();
> > >
> > >
> > >             // anything that we've not already processed needs to be put back on the queue
> > >             if (flowFile != null) {
> > >                 session.remove(flowFile);
> > >             }
> > >         } finally {
> > >             queueLock.lock();
> > >             try {
> > >                 inProcess.removeAll(files);
> > >                 recentlyProcessed.addAll(files);
> > >             } finally {
> > >                 queueLock.unlock();
> > >             }
> > >         }
> > >     }
> > >
> > > }
> > >
> > > Also please confirm which codebase you're running against.  Latest HEAD of master?
> > >
> > > I'm using a snap from GitHub that's several weeks old from August 
> > > 25th (it's working fine with the original GetFile processor, which 
> > > this code was derived from)
> > >
> > > Thanks
> > > Joe
> > >
> > > On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
> > >> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
> > >>
> > >> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
> > >>
> > >> Not being intimate with the internals of the framework yet, not sure what would case this.
> > >>
> > >> Rick
> > >>
> > >> -----Original Message-----
> > >> From: Rick Braddy
> > >> Sent: Friday, September 11, 2015 8:26 PM
> > >> To: dev@nifi.apache.org
> > >> Subject: RE: Transfer relationship not specified
> > >> (FlowFileHandlingException)
> > >>
> > >> Mark,
> > >>
> > >> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
> > >>
> > >> Rick
> > >>
> > >>        final ListIterator<File> itr = files.listIterator();
> > >>         FlowFile flowFile = null;
> > >>         try {
> > >>             while (itr.hasNext()) {
> > >>                 final File file = itr.next();
> > >>                 final Path filePath = file.toPath();
> > >>                 final Path relativePath = filePath.relativize(filePath.getParent());
> > >>                 String relativePathString = relativePath.toString() + "/";
> > >>                 if (relativePathString.isEmpty()) {
> > >>                     relativePathString = "./";
> > >>                 }
> > >>                 final Path absPath = filePath.toAbsolutePath();
> > >>                 final String absPathString =
> > >> absPath.getParent().toString() + "/";
> > >>
> > >>                 final long importStart = System.nanoTime();
> > >>                 String fileType = "directory";
> > >>                 flowFile = session.create();
> > >>                 if (file.isFile()){
> > >>                     fileType = "file";
> > >>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
> > >>                 }
> > >>                 final long importNanos = System.nanoTime() - importStart;
> > >>                 final long importMillis = 
> > >> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
> > >>
> > >>                 flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
> > >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
> > >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
> > >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
> > >>                 Map<String, String> attributes = getAttributesFromFile(filePath);
> > >>                 if (attributes.size() > 0) {
> > >>                     flowFile = session.putAllAttributes(flowFile, attributes);
> > >>                 }
> > >>
> > >>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
> > >>                 session.transfer(flowFile, REL_SUCCESS);
> > >>                 logger.info("added {} to flow", new 
> > >> Object[]{flowFile});
> > >>
> > >>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
> > >>                     queueLock.lock();
> > >>                     try {
> > >>                         while (itr.hasNext()) {
> > >>                             final File nextFile = itr.next();
> > >>                             fileQueue.add(nextFile);
> > >>                             inProcess.remove(nextFile);
> > >>                         }
> > >>                     } finally {
> > >>                         queueLock.unlock();
> > >>                     }
> > >>                 }
> > >>             }
> > >>             session.commit();
> > >>         } catch (final Exception e) {
> > >>             logger.error("Failed to transfer files due to {}", 
> > >> e);
> > >>
> > >> -----Original Message-----
> > >> From: Mark Payne [mailto:markap14@hotmail.com]
> > >> Sent: Friday, September 11, 2015 6:39 PM
> > >> To: dev@nifi.apache.org
> > >> Subject: RE: Transfer relationship not specified
> > >> (FlowFileHandlingException)
> > >>
> > >> Rick,
> > >> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
> > >> I.e., there was never a call to session.transfer() for that FlowFile.
> > >> Thanks-Mark
> > >>
> > >>> From: rbraddy@softnas.com
> > >>> To: dev@nifi.apache.org
> > >>> Subject: RE: Transfer relationship not specified
> > >>> (FlowFileHandlingException)
> > >>> Date: Fri, 11 Sep 2015 23:25:33 +0000
> > >>>
> > >>> Some more details:
> > >>>
> > >>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
> > >>> c.s.c.processors.files.GetFileData
> > >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to 
> > >>> process session due to
> > >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> > >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515
> > >>> ,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClai
> > >>> m@
> > >>> fe
> > >>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in 
> > >>> this session
> > >>> (StandardProcessSession[id=6967]):
> > >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> > >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515
> > >>> ,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClai
> > >>> m@
> > >>> fe
> > >>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in 
> > >>> this session
> > >>> (StandardProcessSession[id=6967])
> > >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> > >>> c.s.c.processors.files.GetFileData
> > >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
> > >>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444
> > >>> ,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClai
> > >>> m@
> > >>> ff
> > >>> 0 c ad6b,offset=0,name=anImage.png,size=16418] to flow
> > >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> > >>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
> > >>>
> > >>> So it seems there's some issue with each of the FlowFiles...
> > >>>
> > >>> -----Original Message-----
> > >>> From: Rick Braddy [mailto:rbraddy@softnas.com]
> > >>> Sent: Friday, September 11, 2015 6:00 PM
> > >>> To: dev@nifi.apache.org
> > >>> Subject: Transfer relationship not specified
> > >>> (FlowFileHandlingException)
> > >>>
> > >>> Hi,
> > >>>
> > >>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
> > >>>
> > >>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
> > >>> c.s.c.processors.files.GetFileData
> > >>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
> > >>> retrieve files due to {}
> > >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> > >>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31
> > >>> ,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClai
> > >>> m@
> > >>> 93
> > >>> 9
> > >>> 6 4c66,offset=244,name=225120878343804,size=42] transfer 
> > >>> relationship not specified
> > >>>
> > >>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
> > >>>
> > >>> Any suggestions for troubleshooting?
> > >>>
> > >>> Rick
> > >>>
> > >>>
> > >>
>  		 	   		  
 		 	   		  

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Mark Payne <ma...@hotmail.com>.
Rick,
Some great questions!
1. If session.remove() is called, and then the session is rolled back, the original FlowFile will be requeued so that it can be re-processed.
2. If you call session.importFrom, it will overwrite the contents of the FlowFile. However, this is done in such a way that if you overwrite itand then call session.rollback, again the original FlowFile will be re-queued, not the modified version.
3. re: @SupportsBatching. There's not necessarily a lot of overhead in calling onTrigger() repeated, but rather the overhead lives within session.commit().When you use the @SupportsBatching annotation, and then configure the Processor to batch, for say 25 ms (in the Scheduling Tab of the Processor Configuration dialog),what you are doing is telling the framework that calls to session.commit() can be done asynchronously, essentially, so that we can wait up to 25 milliseconds and then performa single update to the FlowFile and Provenance Repositories that includes all of the information for those sessions. So it is the session commit that is batched up.
GetFile doesn't use this approach because if it did this, it could pull in a FlowFile, call session.commit (which may not actually update the repositories if the updates are batched),and then deletes the original file. At this point, if the node died, it would have lost that session commit (which means that the file will not be in the flow on restart) AND it willhave already deleted the file from disk.
So in order to avoid that, it does not support batching. So when we call session.commit(), we know that the repositories are updated and the FlowFile is safe within NiFi. So nowthat NiFi is responsible for handling the file, we can delete the original.
In your case, you can support batching because you're not deleting the origin. So if you haven't yet committed the session and the node dies, it's okay, you can just re-processthe file.
Does that make sense?  If not, please let me know because this isn't an easy concept necessarily to communicate, but it needs to be communicated well.
Thanks-Mark

> From: rbraddy@softnas.com
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified (FlowFileHandlingException)
> Date: Sat, 12 Sep 2015 20:12:35 +0000
> 
> Mark,
> 
> That's exactly the advice and guidance that I needed.  A couple of questions for #2 and #3 cases.
> 
> 2. Call session.remove() instead of session.transfer(). This will remove the incoming FlowFile from the flow. Given thedescription that you have provided, that means you will always create a single child FlowFile and then remove the parent. That would work, but it seems a bit confusing.
> 
> >> when session.remove() is called and a session.rollback() occurs due to an error, does the remove operation also get rolled back so the original inbound flowfiles are available to be reprocessed?
> 
> 3. The option that I think makes the most sense for this use case. Do not create a new FlowFile at all. Instead, get a FlowFilevia session.get() and then modify the contents of that FlowFile by calling session.import with the original FlowFile.
> 
> >> I agree this makes a lot of sense and would simplify things greatly.  In this case, would session.importFrom overwrite the contents of the original inbound flowfile or append to it?  (need a way to replace the file path with the file's contents)
> 
> >> Is there a significant amount of context-switch overhead in onTrigger() being called repeatedly vs. batching/queueing?  Is this effectively what @SupportBatching does, to make multiple immediate calls back to onTrigger() before yielding to other threads? 
> 
> >> Why does the GetFile processor use the internal batching/queuing approach?
> 
> Thank you.
> Rick
> 
> -----Original Message-----
> From: Mark Payne [mailto:markap14@hotmail.com] 
> Sent: Saturday, September 12, 2015 2:59 PM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified (FlowFileHandlingException)
> 
> Rick,
> You must account for all FlowFiles that you pull in from queues. You can account for a FlowFile by eitherremoving it or transferring it. In terms of what to do with the incoming FlowFiles, you have three options, really:
> 1. Transfer to an 'original' relationship. If you think this will be an "unused" relationship, then this is probablynot a good option for you. I would never recommend creating a relationship that you don't believe will be useful.
> 2. Call session.remove() instead of session.transfer(). This will remove the incoming FlowFile from the flow. Given thedescription that you have provided, that means you will always create a single child FlowFile and then remove the parent.That would work, but it seems a bit confusing.
> 3. The option that I think makes the most sense for this use case. Do not create a new FlowFile at all. Instead, get a FlowFilevia session.get() and then modify the contents of that FlowFile by calling session.import with the original FlowFile. This willthen modify the content of the FlowFile, rather than creating a child. The FlowFile would then just be routed to success.
> Regarding batching: generally for this type of Processor, I would recommend you do not pull in 10 FlowFiles at a time and keepa collection of them. Rather, I would pull in a single FlowFile and process it. I would then use the @SupportsBatching annotation.This allows the operator to make the trade-off of throughput vs. latency as appropriate for the particular flow. This approach alsoallows the Processor's code to be a little simpler, as it doesn't need to maintain Collections of FlowFiles - it just operates on a singleFlowFile at a time.
> Hope this helps! Let us know if you have any further questions or issues.
> Thanks-Mark
> 
> > From: rbraddy@softnas.com
> > To: dev@nifi.apache.org
> > Subject: RE: Transfer relationship not specified 
> > (FlowFileHandlingException)
> > Date: Sat, 12 Sep 2015 18:16:48 +0000
> > 
> > I went back and re-read the Developer's Guide to try and understand how incoming FlowFiles should be handled relative to session transactions.
> > 
> > Currently, my processor uses session.get() to pull a batch of up to 10 incoming flowfiles and places them into an internal queue.  It never calls session.transfer() for these inbound flowfiles.  It just reads the contents from each one using session.read() and processes that input.  Each of these flowfiles contains a directory or filename of a file.
> > 
> > Then it creates up to 10 new outbound flowfiles as a batch, using session.importFrom() and session.transfer() for each one, then finally calls session.commit().  It is at this point that the framework raises an exception FlowFileHandlingException (transfer relationship not specified) and fails.  Based upon debugging, it appears the exception is raised against the first flowfile record that was recorded - the first incoming flowfile, for which session.transfer() was never called.
> > 
> > Questions:
> > 
> > 1. Is it required to "dispose" of incoming flowfiles that have been accessed via session.get() using session.transfer()?
> > 
> > 2. If so, should the incoming flowfiles be routed to an unused relationship, such as "ORIGINAL" like the SplitText processor does?
> > 
> > 3. I read about the "SupportsBatching" attribute, which is not set on my processor (or the original GetFile processor, which does not read from input queues).  Given I am reading and writing flowfiles in batches, should this attribute be set?
> > 
> > Thanks
> > Rick
> > 
> > -----Original Message-----
> > From: Rick Braddy [mailto:rbraddy@softnas.com]
> > Sent: Saturday, September 12, 2015 8:50 AM
> > To: dev@nifi.apache.org
> > Subject: RE: Transfer relationship not specified 
> > (FlowFileHandlingException)
> > 
> > More information...Upon further debugging with in the StandardProcessSession : checkpoint() method, it's clear that there are 16 items in "record" list - 10 are incoming flowfiles that were read and processes, and 6 are the newly-created outbound flows.
> > 
> > The for loop is throwing the exception on the very first record, which is actually one of the 10 inbound flowfiles that has already been processed:
> > 
> >        //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
> >         final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
> >         for (final StandardRepositoryRecord record : records.values()) {
> >             if (record.isMarkedForDelete()) {
> >                 continue;
> >             }
> >             final Relationship relationship = record.getTransferRelationship();
> >          if (relationship == null) {
> >                 rollback();
> >  -->          throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship not specified");
> >             }
> > 
> > I modified the standard GetFile processor to accept incoming flowfiles that contain a file path per flowfile, so GetFileData (the new processor) can be triggered to process specific files.  I did NOT define a specific incoming relationship and just assumed there is one already available by default.  If there is not, that may be the problem.  There is clearly an inbound relationship established, as the inbound flowfiles are being read and processed just fine, but it seems that commit() calling checkpoint() doesn't like what it's seeing overall.
> > 
> > Rick
> > 
> > -----Original Message-----
> > From: Rick Braddy [mailto:rbraddy@softnas.com]
> > Sent: Saturday, September 12, 2015 7:42 AM
> > To: dev@nifi.apache.org
> > Subject: RE: Transfer relationship not specified 
> > (FlowFileHandlingException)
> > 
> > Ok.  Thanks Joe.  The files I'm using are simple .c text and .png image files for testing.
> > 
> > Rick
> > 
> > -----Original Message-----
> > From: Joe Witt [mailto:joe.witt@gmail.com]
> > Sent: Saturday, September 12, 2015 7:41 AM
> > To: dev@nifi.apache.org
> > Subject: Re: Transfer relationship not specified 
> > (FlowFileHandlingException)
> > 
> > Yep.  Looks legit to me.  Will try a unit test with a mixture of flowFiles associated with content and without.
> > 
> > On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rb...@softnas.com> wrote:
> > > Joe,
> > >
> > > Replies below.
> > >
> > > Rick
> > >
> > >
> > > -----Original Message-----
> > > From: Joe Witt [mailto:joe.witt@gmail.com]
> > > Sent: Saturday, September 12, 2015 7:02 AM
> > > To: dev@nifi.apache.org
> > > Subject: Re: Transfer relationship not specified
> > > (FlowFileHandlingException)
> > >
> > > Rick
> > >
> > > Can you show what is happening in the exception handling part of your code as well?
> > >
> > > Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help).
> > >
> > > Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer().
> > >
> > > Here's the entire onTrigger method:
> > >
> > >     @Override
> > >     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
> > >         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
> > >         final ProcessorLog logger = getLogger();
> > >
> > >         final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
> > >         if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
> > >             try {
> > >                 final Set<File> filelist = getFileList(context, 
> > > session);
> > >
> > >                 queueLock.lock();
> > >                 try {
> > >                     filelist.removeAll(inProcess);
> > >                     if (!keepingSourceFile) {
> > >                         filelist.removeAll(recentlyProcessed);
> > >                     }
> > >
> > >                     fileQueue.clear();
> > >                     fileQueue.addAll(filelist);
> > >
> > >                     queueLastUpdated.set(System.currentTimeMillis());
> > >                     recentlyProcessed.clear();
> > >
> > >                     if (filelist.isEmpty()) {
> > >                         context.yield();
> > >                     }
> > >                 } finally {
> > >                     queueLock.unlock();
> > >                 }
> > >             } finally {
> > >                 filelistLock.unlock();
> > >             }
> > >         }
> > >
> > >         final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
> > >         final List<File> files = new ArrayList<>(batchSize);
> > >         queueLock.lock();
> > >         try {
> > >             fileQueue.drainTo(files, batchSize);
> > >             if (files.isEmpty()) {
> > >                 return;
> > >             } else {
> > >                 inProcess.addAll(files);
> > >             }
> > >         } finally {
> > >             queueLock.unlock();
> > >         }
> > >
> > >         final ListIterator<File> itr = files.listIterator();
> > >         FlowFile flowFile = null;
> > >         try {
> > >             while (itr.hasNext()) {
> > >                 final File file = itr.next();
> > >                 final Path filePath = file.toPath();
> > >                 final Path relativePath = filePath.relativize(filePath.getParent());
> > >                 String relativePathString = relativePath.toString() + "/";
> > >                 if (relativePathString.isEmpty()) {
> > >                     relativePathString = "./";
> > >                 }
> > >                 final Path absPath = filePath.toAbsolutePath();
> > >                 final String absPathString =
> > > absPath.getParent().toString() + "/";
> > >
> > >                 final long importStart = System.nanoTime();
> > >                 String fileType = "directory";
> > >                 if (file.isFile()){
> > >                     fileType = "file";
> > >                     flowFile = session.create();
> > >                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
> > >                 }
> > >                 else
> > >                 {
> > >                    logger.info("skipping directory {} and not placing into output flow", new Object[]{file});
> > >                    continue; // ******* SKIP DIRECTORIES FOR NOW ****
> > >                 }
> > >
> > >                 final long importNanos = System.nanoTime() - importStart;
> > >                 final long importMillis = 
> > > TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
> > >
> > >               //  flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
> > >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
> > >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
> > >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
> > >               //  Map<String, String> attributes = getAttributesFromFile(filePath);
> > >               //  if (attributes.size() > 0) {
> > >               //      flowFile = session.putAllAttributes(flowFile, attributes);
> > >               //  }
> > >
> > >                 final String fileURI = file.toURI().toString();
> > >                 session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
> > >                 session.transfer(flowFile, REL_SUCCESS);
> > >                 logger.info("added {} to flow", new 
> > > Object[]{flowFile});
> > >
> > >                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
> > >                     queueLock.lock();
> > >                     try {
> > >                         while (itr.hasNext()) {
> > >                             final File nextFile = itr.next();
> > >                             fileQueue.add(nextFile);
> > >                             inProcess.remove(nextFile);
> > >                         }
> > >                     } finally {
> > >                         queueLock.unlock();
> > >                     }
> > >                 }
> > >             }
> > >             session.commit();
> > >         } catch (final Exception e) {
> > >             logger.error("Failed to transfer files due to {}", e);
> > >             context.yield();
> > >
> > >
> > >             // anything that we've not already processed needs to be put back on the queue
> > >             if (flowFile != null) {
> > >                 session.remove(flowFile);
> > >             }
> > >         } finally {
> > >             queueLock.lock();
> > >             try {
> > >                 inProcess.removeAll(files);
> > >                 recentlyProcessed.addAll(files);
> > >             } finally {
> > >                 queueLock.unlock();
> > >             }
> > >         }
> > >     }
> > >
> > > }
> > >
> > > Also please confirm which codebase you're running against.  Latest HEAD of master?
> > >
> > > I'm using a snap from GitHub that's several weeks old from August 
> > > 25th (it's working fine with the original GetFile processor, which 
> > > this code was derived from)
> > >
> > > Thanks
> > > Joe
> > >
> > > On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
> > >> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
> > >>
> > >> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
> > >>
> > >> Not being intimate with the internals of the framework yet, not sure what would case this.
> > >>
> > >> Rick
> > >>
> > >> -----Original Message-----
> > >> From: Rick Braddy
> > >> Sent: Friday, September 11, 2015 8:26 PM
> > >> To: dev@nifi.apache.org
> > >> Subject: RE: Transfer relationship not specified
> > >> (FlowFileHandlingException)
> > >>
> > >> Mark,
> > >>
> > >> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
> > >>
> > >> Rick
> > >>
> > >>        final ListIterator<File> itr = files.listIterator();
> > >>         FlowFile flowFile = null;
> > >>         try {
> > >>             while (itr.hasNext()) {
> > >>                 final File file = itr.next();
> > >>                 final Path filePath = file.toPath();
> > >>                 final Path relativePath = filePath.relativize(filePath.getParent());
> > >>                 String relativePathString = relativePath.toString() + "/";
> > >>                 if (relativePathString.isEmpty()) {
> > >>                     relativePathString = "./";
> > >>                 }
> > >>                 final Path absPath = filePath.toAbsolutePath();
> > >>                 final String absPathString =
> > >> absPath.getParent().toString() + "/";
> > >>
> > >>                 final long importStart = System.nanoTime();
> > >>                 String fileType = "directory";
> > >>                 flowFile = session.create();
> > >>                 if (file.isFile()){
> > >>                     fileType = "file";
> > >>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
> > >>                 }
> > >>                 final long importNanos = System.nanoTime() - importStart;
> > >>                 final long importMillis = 
> > >> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
> > >>
> > >>                 flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
> > >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
> > >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
> > >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
> > >>                 Map<String, String> attributes = getAttributesFromFile(filePath);
> > >>                 if (attributes.size() > 0) {
> > >>                     flowFile = session.putAllAttributes(flowFile, attributes);
> > >>                 }
> > >>
> > >>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
> > >>                 session.transfer(flowFile, REL_SUCCESS);
> > >>                 logger.info("added {} to flow", new 
> > >> Object[]{flowFile});
> > >>
> > >>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
> > >>                     queueLock.lock();
> > >>                     try {
> > >>                         while (itr.hasNext()) {
> > >>                             final File nextFile = itr.next();
> > >>                             fileQueue.add(nextFile);
> > >>                             inProcess.remove(nextFile);
> > >>                         }
> > >>                     } finally {
> > >>                         queueLock.unlock();
> > >>                     }
> > >>                 }
> > >>             }
> > >>             session.commit();
> > >>         } catch (final Exception e) {
> > >>             logger.error("Failed to transfer files due to {}", e);
> > >>
> > >> -----Original Message-----
> > >> From: Mark Payne [mailto:markap14@hotmail.com]
> > >> Sent: Friday, September 11, 2015 6:39 PM
> > >> To: dev@nifi.apache.org
> > >> Subject: RE: Transfer relationship not specified
> > >> (FlowFileHandlingException)
> > >>
> > >> Rick,
> > >> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
> > >> I.e., there was never a call to session.transfer() for that FlowFile.
> > >> Thanks-Mark
> > >>
> > >>> From: rbraddy@softnas.com
> > >>> To: dev@nifi.apache.org
> > >>> Subject: RE: Transfer relationship not specified
> > >>> (FlowFileHandlingException)
> > >>> Date: Fri, 11 Sep 2015 23:25:33 +0000
> > >>>
> > >>> Some more details:
> > >>>
> > >>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
> > >>> c.s.c.processors.files.GetFileData
> > >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to 
> > >>> process session due to
> > >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> > >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@
> > >>> fe
> > >>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
> > >>> session
> > >>> (StandardProcessSession[id=6967]):
> > >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> > >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@
> > >>> fe
> > >>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
> > >>> session
> > >>> (StandardProcessSession[id=6967])
> > >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> > >>> c.s.c.processors.files.GetFileData
> > >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
> > >>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@
> > >>> ff
> > >>> 0 c ad6b,offset=0,name=anImage.png,size=16418] to flow
> > >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> > >>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
> > >>>
> > >>> So it seems there's some issue with each of the FlowFiles...
> > >>>
> > >>> -----Original Message-----
> > >>> From: Rick Braddy [mailto:rbraddy@softnas.com]
> > >>> Sent: Friday, September 11, 2015 6:00 PM
> > >>> To: dev@nifi.apache.org
> > >>> Subject: Transfer relationship not specified
> > >>> (FlowFileHandlingException)
> > >>>
> > >>> Hi,
> > >>>
> > >>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
> > >>>
> > >>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
> > >>> c.s.c.processors.files.GetFileData
> > >>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
> > >>> retrieve files due to {}
> > >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> > >>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,c
> > >>> la
> > >>> i
> > >>> m
> > >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@
> > >>> 93
> > >>> 9
> > >>> 6 4c66,offset=244,name=225120878343804,size=42] transfer 
> > >>> relationship not specified
> > >>>
> > >>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
> > >>>
> > >>> Any suggestions for troubleshooting?
> > >>>
> > >>> Rick
> > >>>
> > >>>
> > >>
>  		 	   		  
 		 	   		  

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Rick Braddy <rb...@softnas.com>.
Mark,

That's exactly the advice and guidance that I needed.  A couple of questions for #2 and #3 cases.

2. Call session.remove() instead of session.transfer(). This will remove the incoming FlowFile from the flow. Given thedescription that you have provided, that means you will always create a single child FlowFile and then remove the parent. That would work, but it seems a bit confusing.

>> when session.remove() is called and a session.rollback() occurs due to an error, does the remove operation also get rolled back so the original inbound flowfiles are available to be reprocessed?

3. The option that I think makes the most sense for this use case. Do not create a new FlowFile at all. Instead, get a FlowFilevia session.get() and then modify the contents of that FlowFile by calling session.import with the original FlowFile.

>> I agree this makes a lot of sense and would simplify things greatly.  In this case, would session.importFrom overwrite the contents of the original inbound flowfile or append to it?  (need a way to replace the file path with the file's contents)

>> Is there a significant amount of context-switch overhead in onTrigger() being called repeatedly vs. batching/queueing?  Is this effectively what @SupportBatching does, to make multiple immediate calls back to onTrigger() before yielding to other threads? 

>> Why does the GetFile processor use the internal batching/queuing approach?

Thank you.
Rick

-----Original Message-----
From: Mark Payne [mailto:markap14@hotmail.com] 
Sent: Saturday, September 12, 2015 2:59 PM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

Rick,
You must account for all FlowFiles that you pull in from queues. You can account for a FlowFile by eitherremoving it or transferring it. In terms of what to do with the incoming FlowFiles, you have three options, really:
1. Transfer to an 'original' relationship. If you think this will be an "unused" relationship, then this is probablynot a good option for you. I would never recommend creating a relationship that you don't believe will be useful.
2. Call session.remove() instead of session.transfer(). This will remove the incoming FlowFile from the flow. Given thedescription that you have provided, that means you will always create a single child FlowFile and then remove the parent.That would work, but it seems a bit confusing.
3. The option that I think makes the most sense for this use case. Do not create a new FlowFile at all. Instead, get a FlowFilevia session.get() and then modify the contents of that FlowFile by calling session.import with the original FlowFile. This willthen modify the content of the FlowFile, rather than creating a child. The FlowFile would then just be routed to success.
Regarding batching: generally for this type of Processor, I would recommend you do not pull in 10 FlowFiles at a time and keepa collection of them. Rather, I would pull in a single FlowFile and process it. I would then use the @SupportsBatching annotation.This allows the operator to make the trade-off of throughput vs. latency as appropriate for the particular flow. This approach alsoallows the Processor's code to be a little simpler, as it doesn't need to maintain Collections of FlowFiles - it just operates on a singleFlowFile at a time.
Hope this helps! Let us know if you have any further questions or issues.
Thanks-Mark

> From: rbraddy@softnas.com
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified 
> (FlowFileHandlingException)
> Date: Sat, 12 Sep 2015 18:16:48 +0000
> 
> I went back and re-read the Developer's Guide to try and understand how incoming FlowFiles should be handled relative to session transactions.
> 
> Currently, my processor uses session.get() to pull a batch of up to 10 incoming flowfiles and places them into an internal queue.  It never calls session.transfer() for these inbound flowfiles.  It just reads the contents from each one using session.read() and processes that input.  Each of these flowfiles contains a directory or filename of a file.
> 
> Then it creates up to 10 new outbound flowfiles as a batch, using session.importFrom() and session.transfer() for each one, then finally calls session.commit().  It is at this point that the framework raises an exception FlowFileHandlingException (transfer relationship not specified) and fails.  Based upon debugging, it appears the exception is raised against the first flowfile record that was recorded - the first incoming flowfile, for which session.transfer() was never called.
> 
> Questions:
> 
> 1. Is it required to "dispose" of incoming flowfiles that have been accessed via session.get() using session.transfer()?
> 
> 2. If so, should the incoming flowfiles be routed to an unused relationship, such as "ORIGINAL" like the SplitText processor does?
> 
> 3. I read about the "SupportsBatching" attribute, which is not set on my processor (or the original GetFile processor, which does not read from input queues).  Given I am reading and writing flowfiles in batches, should this attribute be set?
> 
> Thanks
> Rick
> 
> -----Original Message-----
> From: Rick Braddy [mailto:rbraddy@softnas.com]
> Sent: Saturday, September 12, 2015 8:50 AM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified 
> (FlowFileHandlingException)
> 
> More information...Upon further debugging with in the StandardProcessSession : checkpoint() method, it's clear that there are 16 items in "record" list - 10 are incoming flowfiles that were read and processes, and 6 are the newly-created outbound flows.
> 
> The for loop is throwing the exception on the very first record, which is actually one of the 10 inbound flowfiles that has already been processed:
> 
>        //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
>         final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
>         for (final StandardRepositoryRecord record : records.values()) {
>             if (record.isMarkedForDelete()) {
>                 continue;
>             }
>             final Relationship relationship = record.getTransferRelationship();
>          if (relationship == null) {
>                 rollback();
>  -->          throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship not specified");
>             }
> 
> I modified the standard GetFile processor to accept incoming flowfiles that contain a file path per flowfile, so GetFileData (the new processor) can be triggered to process specific files.  I did NOT define a specific incoming relationship and just assumed there is one already available by default.  If there is not, that may be the problem.  There is clearly an inbound relationship established, as the inbound flowfiles are being read and processed just fine, but it seems that commit() calling checkpoint() doesn't like what it's seeing overall.
> 
> Rick
> 
> -----Original Message-----
> From: Rick Braddy [mailto:rbraddy@softnas.com]
> Sent: Saturday, September 12, 2015 7:42 AM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified 
> (FlowFileHandlingException)
> 
> Ok.  Thanks Joe.  The files I'm using are simple .c text and .png image files for testing.
> 
> Rick
> 
> -----Original Message-----
> From: Joe Witt [mailto:joe.witt@gmail.com]
> Sent: Saturday, September 12, 2015 7:41 AM
> To: dev@nifi.apache.org
> Subject: Re: Transfer relationship not specified 
> (FlowFileHandlingException)
> 
> Yep.  Looks legit to me.  Will try a unit test with a mixture of flowFiles associated with content and without.
> 
> On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rb...@softnas.com> wrote:
> > Joe,
> >
> > Replies below.
> >
> > Rick
> >
> >
> > -----Original Message-----
> > From: Joe Witt [mailto:joe.witt@gmail.com]
> > Sent: Saturday, September 12, 2015 7:02 AM
> > To: dev@nifi.apache.org
> > Subject: Re: Transfer relationship not specified
> > (FlowFileHandlingException)
> >
> > Rick
> >
> > Can you show what is happening in the exception handling part of your code as well?
> >
> > Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help).
> >
> > Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer().
> >
> > Here's the entire onTrigger method:
> >
> >     @Override
> >     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
> >         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
> >         final ProcessorLog logger = getLogger();
> >
> >         final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
> >         if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
> >             try {
> >                 final Set<File> filelist = getFileList(context, 
> > session);
> >
> >                 queueLock.lock();
> >                 try {
> >                     filelist.removeAll(inProcess);
> >                     if (!keepingSourceFile) {
> >                         filelist.removeAll(recentlyProcessed);
> >                     }
> >
> >                     fileQueue.clear();
> >                     fileQueue.addAll(filelist);
> >
> >                     queueLastUpdated.set(System.currentTimeMillis());
> >                     recentlyProcessed.clear();
> >
> >                     if (filelist.isEmpty()) {
> >                         context.yield();
> >                     }
> >                 } finally {
> >                     queueLock.unlock();
> >                 }
> >             } finally {
> >                 filelistLock.unlock();
> >             }
> >         }
> >
> >         final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
> >         final List<File> files = new ArrayList<>(batchSize);
> >         queueLock.lock();
> >         try {
> >             fileQueue.drainTo(files, batchSize);
> >             if (files.isEmpty()) {
> >                 return;
> >             } else {
> >                 inProcess.addAll(files);
> >             }
> >         } finally {
> >             queueLock.unlock();
> >         }
> >
> >         final ListIterator<File> itr = files.listIterator();
> >         FlowFile flowFile = null;
> >         try {
> >             while (itr.hasNext()) {
> >                 final File file = itr.next();
> >                 final Path filePath = file.toPath();
> >                 final Path relativePath = filePath.relativize(filePath.getParent());
> >                 String relativePathString = relativePath.toString() + "/";
> >                 if (relativePathString.isEmpty()) {
> >                     relativePathString = "./";
> >                 }
> >                 final Path absPath = filePath.toAbsolutePath();
> >                 final String absPathString =
> > absPath.getParent().toString() + "/";
> >
> >                 final long importStart = System.nanoTime();
> >                 String fileType = "directory";
> >                 if (file.isFile()){
> >                     fileType = "file";
> >                     flowFile = session.create();
> >                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
> >                 }
> >                 else
> >                 {
> >                    logger.info("skipping directory {} and not placing into output flow", new Object[]{file});
> >                    continue; // ******* SKIP DIRECTORIES FOR NOW ****
> >                 }
> >
> >                 final long importNanos = System.nanoTime() - importStart;
> >                 final long importMillis = 
> > TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
> >
> >               //  flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
> >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
> >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
> >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
> >               //  Map<String, String> attributes = getAttributesFromFile(filePath);
> >               //  if (attributes.size() > 0) {
> >               //      flowFile = session.putAllAttributes(flowFile, attributes);
> >               //  }
> >
> >                 final String fileURI = file.toURI().toString();
> >                 session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
> >                 session.transfer(flowFile, REL_SUCCESS);
> >                 logger.info("added {} to flow", new 
> > Object[]{flowFile});
> >
> >                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
> >                     queueLock.lock();
> >                     try {
> >                         while (itr.hasNext()) {
> >                             final File nextFile = itr.next();
> >                             fileQueue.add(nextFile);
> >                             inProcess.remove(nextFile);
> >                         }
> >                     } finally {
> >                         queueLock.unlock();
> >                     }
> >                 }
> >             }
> >             session.commit();
> >         } catch (final Exception e) {
> >             logger.error("Failed to transfer files due to {}", e);
> >             context.yield();
> >
> >
> >             // anything that we've not already processed needs to be put back on the queue
> >             if (flowFile != null) {
> >                 session.remove(flowFile);
> >             }
> >         } finally {
> >             queueLock.lock();
> >             try {
> >                 inProcess.removeAll(files);
> >                 recentlyProcessed.addAll(files);
> >             } finally {
> >                 queueLock.unlock();
> >             }
> >         }
> >     }
> >
> > }
> >
> > Also please confirm which codebase you're running against.  Latest HEAD of master?
> >
> > I'm using a snap from GitHub that's several weeks old from August 
> > 25th (it's working fine with the original GetFile processor, which 
> > this code was derived from)
> >
> > Thanks
> > Joe
> >
> > On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
> >> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
> >>
> >> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
> >>
> >> Not being intimate with the internals of the framework yet, not sure what would case this.
> >>
> >> Rick
> >>
> >> -----Original Message-----
> >> From: Rick Braddy
> >> Sent: Friday, September 11, 2015 8:26 PM
> >> To: dev@nifi.apache.org
> >> Subject: RE: Transfer relationship not specified
> >> (FlowFileHandlingException)
> >>
> >> Mark,
> >>
> >> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
> >>
> >> Rick
> >>
> >>        final ListIterator<File> itr = files.listIterator();
> >>         FlowFile flowFile = null;
> >>         try {
> >>             while (itr.hasNext()) {
> >>                 final File file = itr.next();
> >>                 final Path filePath = file.toPath();
> >>                 final Path relativePath = filePath.relativize(filePath.getParent());
> >>                 String relativePathString = relativePath.toString() + "/";
> >>                 if (relativePathString.isEmpty()) {
> >>                     relativePathString = "./";
> >>                 }
> >>                 final Path absPath = filePath.toAbsolutePath();
> >>                 final String absPathString =
> >> absPath.getParent().toString() + "/";
> >>
> >>                 final long importStart = System.nanoTime();
> >>                 String fileType = "directory";
> >>                 flowFile = session.create();
> >>                 if (file.isFile()){
> >>                     fileType = "file";
> >>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
> >>                 }
> >>                 final long importNanos = System.nanoTime() - importStart;
> >>                 final long importMillis = 
> >> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
> >>
> >>                 flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
> >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
> >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
> >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
> >>                 Map<String, String> attributes = getAttributesFromFile(filePath);
> >>                 if (attributes.size() > 0) {
> >>                     flowFile = session.putAllAttributes(flowFile, attributes);
> >>                 }
> >>
> >>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
> >>                 session.transfer(flowFile, REL_SUCCESS);
> >>                 logger.info("added {} to flow", new 
> >> Object[]{flowFile});
> >>
> >>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
> >>                     queueLock.lock();
> >>                     try {
> >>                         while (itr.hasNext()) {
> >>                             final File nextFile = itr.next();
> >>                             fileQueue.add(nextFile);
> >>                             inProcess.remove(nextFile);
> >>                         }
> >>                     } finally {
> >>                         queueLock.unlock();
> >>                     }
> >>                 }
> >>             }
> >>             session.commit();
> >>         } catch (final Exception e) {
> >>             logger.error("Failed to transfer files due to {}", e);
> >>
> >> -----Original Message-----
> >> From: Mark Payne [mailto:markap14@hotmail.com]
> >> Sent: Friday, September 11, 2015 6:39 PM
> >> To: dev@nifi.apache.org
> >> Subject: RE: Transfer relationship not specified
> >> (FlowFileHandlingException)
> >>
> >> Rick,
> >> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
> >> I.e., there was never a call to session.transfer() for that FlowFile.
> >> Thanks-Mark
> >>
> >>> From: rbraddy@softnas.com
> >>> To: dev@nifi.apache.org
> >>> Subject: RE: Transfer relationship not specified
> >>> (FlowFileHandlingException)
> >>> Date: Fri, 11 Sep 2015 23:25:33 +0000
> >>>
> >>> Some more details:
> >>>
> >>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
> >>> c.s.c.processors.files.GetFileData
> >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to 
> >>> process session due to
> >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,c
> >>> la
> >>> i
> >>> m
> >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@
> >>> fe
> >>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
> >>> session
> >>> (StandardProcessSession[id=6967]):
> >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,c
> >>> la
> >>> i
> >>> m
> >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@
> >>> fe
> >>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
> >>> session
> >>> (StandardProcessSession[id=6967])
> >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> >>> c.s.c.processors.files.GetFileData
> >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
> >>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,c
> >>> la
> >>> i
> >>> m
> >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@
> >>> ff
> >>> 0 c ad6b,offset=0,name=anImage.png,size=16418] to flow
> >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> >>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
> >>>
> >>> So it seems there's some issue with each of the FlowFiles...
> >>>
> >>> -----Original Message-----
> >>> From: Rick Braddy [mailto:rbraddy@softnas.com]
> >>> Sent: Friday, September 11, 2015 6:00 PM
> >>> To: dev@nifi.apache.org
> >>> Subject: Transfer relationship not specified
> >>> (FlowFileHandlingException)
> >>>
> >>> Hi,
> >>>
> >>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
> >>>
> >>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
> >>> c.s.c.processors.files.GetFileData
> >>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
> >>> retrieve files due to {}
> >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> >>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,c
> >>> la
> >>> i
> >>> m
> >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@
> >>> 93
> >>> 9
> >>> 6 4c66,offset=244,name=225120878343804,size=42] transfer 
> >>> relationship not specified
> >>>
> >>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
> >>>
> >>> Any suggestions for troubleshooting?
> >>>
> >>> Rick
> >>>
> >>>
> >>
 		 	   		  

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Mark Payne <ma...@hotmail.com>.
Rick,
You must account for all FlowFiles that you pull in from queues. You can account for a FlowFile by eitherremoving it or transferring it. In terms of what to do with the incoming FlowFiles, you have three options, really:
1. Transfer to an 'original' relationship. If you think this will be an "unused" relationship, then this is probablynot a good option for you. I would never recommend creating a relationship that you don't believe will be useful.
2. Call session.remove() instead of session.transfer(). This will remove the incoming FlowFile from the flow. Given thedescription that you have provided, that means you will always create a single child FlowFile and then remove the parent.That would work, but it seems a bit confusing.
3. The option that I think makes the most sense for this use case. Do not create a new FlowFile at all. Instead, get a FlowFilevia session.get() and then modify the contents of that FlowFile by calling session.import with the original FlowFile. This willthen modify the content of the FlowFile, rather than creating a child. The FlowFile would then just be routed to success.
Regarding batching: generally for this type of Processor, I would recommend you do not pull in 10 FlowFiles at a time and keepa collection of them. Rather, I would pull in a single FlowFile and process it. I would then use the @SupportsBatching annotation.This allows the operator to make the trade-off of throughput vs. latency as appropriate for the particular flow. This approach alsoallows the Processor's code to be a little simpler, as it doesn't need to maintain Collections of FlowFiles - it just operates on a singleFlowFile at a time.
Hope this helps! Let us know if you have any further questions or issues.
Thanks-Mark

> From: rbraddy@softnas.com
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified (FlowFileHandlingException)
> Date: Sat, 12 Sep 2015 18:16:48 +0000
> 
> I went back and re-read the Developer's Guide to try and understand how incoming FlowFiles should be handled relative to session transactions.
> 
> Currently, my processor uses session.get() to pull a batch of up to 10 incoming flowfiles and places them into an internal queue.  It never calls session.transfer() for these inbound flowfiles.  It just reads the contents from each one using session.read() and processes that input.  Each of these flowfiles contains a directory or filename of a file.
> 
> Then it creates up to 10 new outbound flowfiles as a batch, using session.importFrom() and session.transfer() for each one, then finally calls session.commit().  It is at this point that the framework raises an exception FlowFileHandlingException (transfer relationship not specified) and fails.  Based upon debugging, it appears the exception is raised against the first flowfile record that was recorded - the first incoming flowfile, for which session.transfer() was never called.
> 
> Questions:
> 
> 1. Is it required to "dispose" of incoming flowfiles that have been accessed via session.get() using session.transfer()?
> 
> 2. If so, should the incoming flowfiles be routed to an unused relationship, such as "ORIGINAL" like the SplitText processor does?
> 
> 3. I read about the "SupportsBatching" attribute, which is not set on my processor (or the original GetFile processor, which does not read from input queues).  Given I am reading and writing flowfiles in batches, should this attribute be set?
> 
> Thanks
> Rick
> 
> -----Original Message-----
> From: Rick Braddy [mailto:rbraddy@softnas.com] 
> Sent: Saturday, September 12, 2015 8:50 AM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified (FlowFileHandlingException)
> 
> More information...Upon further debugging with in the StandardProcessSession : checkpoint() method, it's clear that there are 16 items in "record" list - 10 are incoming flowfiles that were read and processes, and 6 are the newly-created outbound flows.
> 
> The for loop is throwing the exception on the very first record, which is actually one of the 10 inbound flowfiles that has already been processed:
> 
>        //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
>         final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
>         for (final StandardRepositoryRecord record : records.values()) {
>             if (record.isMarkedForDelete()) {
>                 continue;
>             }
>             final Relationship relationship = record.getTransferRelationship();
>          if (relationship == null) {
>                 rollback();
>  -->          throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship not specified");
>             }
> 
> I modified the standard GetFile processor to accept incoming flowfiles that contain a file path per flowfile, so GetFileData (the new processor) can be triggered to process specific files.  I did NOT define a specific incoming relationship and just assumed there is one already available by default.  If there is not, that may be the problem.  There is clearly an inbound relationship established, as the inbound flowfiles are being read and processed just fine, but it seems that commit() calling checkpoint() doesn't like what it's seeing overall.
> 
> Rick
> 
> -----Original Message-----
> From: Rick Braddy [mailto:rbraddy@softnas.com]
> Sent: Saturday, September 12, 2015 7:42 AM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified (FlowFileHandlingException)
> 
> Ok.  Thanks Joe.  The files I'm using are simple .c text and .png image files for testing.
> 
> Rick
> 
> -----Original Message-----
> From: Joe Witt [mailto:joe.witt@gmail.com]
> Sent: Saturday, September 12, 2015 7:41 AM
> To: dev@nifi.apache.org
> Subject: Re: Transfer relationship not specified (FlowFileHandlingException)
> 
> Yep.  Looks legit to me.  Will try a unit test with a mixture of flowFiles associated with content and without.
> 
> On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rb...@softnas.com> wrote:
> > Joe,
> >
> > Replies below.
> >
> > Rick
> >
> >
> > -----Original Message-----
> > From: Joe Witt [mailto:joe.witt@gmail.com]
> > Sent: Saturday, September 12, 2015 7:02 AM
> > To: dev@nifi.apache.org
> > Subject: Re: Transfer relationship not specified
> > (FlowFileHandlingException)
> >
> > Rick
> >
> > Can you show what is happening in the exception handling part of your code as well?
> >
> > Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help).
> >
> > Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer().
> >
> > Here's the entire onTrigger method:
> >
> >     @Override
> >     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
> >         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
> >         final ProcessorLog logger = getLogger();
> >
> >         final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
> >         if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
> >             try {
> >                 final Set<File> filelist = getFileList(context, 
> > session);
> >
> >                 queueLock.lock();
> >                 try {
> >                     filelist.removeAll(inProcess);
> >                     if (!keepingSourceFile) {
> >                         filelist.removeAll(recentlyProcessed);
> >                     }
> >
> >                     fileQueue.clear();
> >                     fileQueue.addAll(filelist);
> >
> >                     queueLastUpdated.set(System.currentTimeMillis());
> >                     recentlyProcessed.clear();
> >
> >                     if (filelist.isEmpty()) {
> >                         context.yield();
> >                     }
> >                 } finally {
> >                     queueLock.unlock();
> >                 }
> >             } finally {
> >                 filelistLock.unlock();
> >             }
> >         }
> >
> >         final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
> >         final List<File> files = new ArrayList<>(batchSize);
> >         queueLock.lock();
> >         try {
> >             fileQueue.drainTo(files, batchSize);
> >             if (files.isEmpty()) {
> >                 return;
> >             } else {
> >                 inProcess.addAll(files);
> >             }
> >         } finally {
> >             queueLock.unlock();
> >         }
> >
> >         final ListIterator<File> itr = files.listIterator();
> >         FlowFile flowFile = null;
> >         try {
> >             while (itr.hasNext()) {
> >                 final File file = itr.next();
> >                 final Path filePath = file.toPath();
> >                 final Path relativePath = filePath.relativize(filePath.getParent());
> >                 String relativePathString = relativePath.toString() + "/";
> >                 if (relativePathString.isEmpty()) {
> >                     relativePathString = "./";
> >                 }
> >                 final Path absPath = filePath.toAbsolutePath();
> >                 final String absPathString =
> > absPath.getParent().toString() + "/";
> >
> >                 final long importStart = System.nanoTime();
> >                 String fileType = "directory";
> >                 if (file.isFile()){
> >                     fileType = "file";
> >                     flowFile = session.create();
> >                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
> >                 }
> >                 else
> >                 {
> >                    logger.info("skipping directory {} and not placing into output flow", new Object[]{file});
> >                    continue; // ******* SKIP DIRECTORIES FOR NOW ****
> >                 }
> >
> >                 final long importNanos = System.nanoTime() - importStart;
> >                 final long importMillis = 
> > TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
> >
> >               //  flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
> >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
> >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
> >               //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
> >               //  Map<String, String> attributes = getAttributesFromFile(filePath);
> >               //  if (attributes.size() > 0) {
> >               //      flowFile = session.putAllAttributes(flowFile, attributes);
> >               //  }
> >
> >                 final String fileURI = file.toURI().toString();
> >                 session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
> >                 session.transfer(flowFile, REL_SUCCESS);
> >                 logger.info("added {} to flow", new 
> > Object[]{flowFile});
> >
> >                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
> >                     queueLock.lock();
> >                     try {
> >                         while (itr.hasNext()) {
> >                             final File nextFile = itr.next();
> >                             fileQueue.add(nextFile);
> >                             inProcess.remove(nextFile);
> >                         }
> >                     } finally {
> >                         queueLock.unlock();
> >                     }
> >                 }
> >             }
> >             session.commit();
> >         } catch (final Exception e) {
> >             logger.error("Failed to transfer files due to {}", e);
> >             context.yield();
> >
> >
> >             // anything that we've not already processed needs to be put back on the queue
> >             if (flowFile != null) {
> >                 session.remove(flowFile);
> >             }
> >         } finally {
> >             queueLock.lock();
> >             try {
> >                 inProcess.removeAll(files);
> >                 recentlyProcessed.addAll(files);
> >             } finally {
> >                 queueLock.unlock();
> >             }
> >         }
> >     }
> >
> > }
> >
> > Also please confirm which codebase you're running against.  Latest HEAD of master?
> >
> > I'm using a snap from GitHub that's several weeks old from August 25th 
> > (it's working fine with the original GetFile processor, which this 
> > code was derived from)
> >
> > Thanks
> > Joe
> >
> > On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
> >> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
> >>
> >> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
> >>
> >> Not being intimate with the internals of the framework yet, not sure what would case this.
> >>
> >> Rick
> >>
> >> -----Original Message-----
> >> From: Rick Braddy
> >> Sent: Friday, September 11, 2015 8:26 PM
> >> To: dev@nifi.apache.org
> >> Subject: RE: Transfer relationship not specified
> >> (FlowFileHandlingException)
> >>
> >> Mark,
> >>
> >> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
> >>
> >> Rick
> >>
> >>        final ListIterator<File> itr = files.listIterator();
> >>         FlowFile flowFile = null;
> >>         try {
> >>             while (itr.hasNext()) {
> >>                 final File file = itr.next();
> >>                 final Path filePath = file.toPath();
> >>                 final Path relativePath = filePath.relativize(filePath.getParent());
> >>                 String relativePathString = relativePath.toString() + "/";
> >>                 if (relativePathString.isEmpty()) {
> >>                     relativePathString = "./";
> >>                 }
> >>                 final Path absPath = filePath.toAbsolutePath();
> >>                 final String absPathString =
> >> absPath.getParent().toString() + "/";
> >>
> >>                 final long importStart = System.nanoTime();
> >>                 String fileType = "directory";
> >>                 flowFile = session.create();
> >>                 if (file.isFile()){
> >>                     fileType = "file";
> >>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
> >>                 }
> >>                 final long importNanos = System.nanoTime() - importStart;
> >>                 final long importMillis = 
> >> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
> >>
> >>                 flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
> >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
> >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
> >>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
> >>                 Map<String, String> attributes = getAttributesFromFile(filePath);
> >>                 if (attributes.size() > 0) {
> >>                     flowFile = session.putAllAttributes(flowFile, attributes);
> >>                 }
> >>
> >>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
> >>                 session.transfer(flowFile, REL_SUCCESS);
> >>                 logger.info("added {} to flow", new 
> >> Object[]{flowFile});
> >>
> >>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
> >>                     queueLock.lock();
> >>                     try {
> >>                         while (itr.hasNext()) {
> >>                             final File nextFile = itr.next();
> >>                             fileQueue.add(nextFile);
> >>                             inProcess.remove(nextFile);
> >>                         }
> >>                     } finally {
> >>                         queueLock.unlock();
> >>                     }
> >>                 }
> >>             }
> >>             session.commit();
> >>         } catch (final Exception e) {
> >>             logger.error("Failed to transfer files due to {}", e);
> >>
> >> -----Original Message-----
> >> From: Mark Payne [mailto:markap14@hotmail.com]
> >> Sent: Friday, September 11, 2015 6:39 PM
> >> To: dev@nifi.apache.org
> >> Subject: RE: Transfer relationship not specified
> >> (FlowFileHandlingException)
> >>
> >> Rick,
> >> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
> >> I.e., there was never a call to session.transfer() for that FlowFile.
> >> Thanks-Mark
> >>
> >>> From: rbraddy@softnas.com
> >>> To: dev@nifi.apache.org
> >>> Subject: RE: Transfer relationship not specified
> >>> (FlowFileHandlingException)
> >>> Date: Fri, 11 Sep 2015 23:25:33 +0000
> >>>
> >>> Some more details:
> >>>
> >>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
> >>> c.s.c.processors.files.GetFileData
> >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to 
> >>> process session due to
> >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,cla
> >>> i
> >>> m
> >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe
> >>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
> >>> session
> >>> (StandardProcessSession[id=6967]):
> >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> >>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,cla
> >>> i
> >>> m
> >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe
> >>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
> >>> session
> >>> (StandardProcessSession[id=6967])
> >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> >>> c.s.c.processors.files.GetFileData
> >>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
> >>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,cla
> >>> i
> >>> m
> >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff
> >>> 0 c ad6b,offset=0,name=anImage.png,size=16418] to flow
> >>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> >>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
> >>>
> >>> So it seems there's some issue with each of the FlowFiles...
> >>>
> >>> -----Original Message-----
> >>> From: Rick Braddy [mailto:rbraddy@softnas.com]
> >>> Sent: Friday, September 11, 2015 6:00 PM
> >>> To: dev@nifi.apache.org
> >>> Subject: Transfer relationship not specified
> >>> (FlowFileHandlingException)
> >>>
> >>> Hi,
> >>>
> >>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
> >>>
> >>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
> >>> c.s.c.processors.files.GetFileData
> >>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
> >>> retrieve files due to {}
> >>> org.apache.nifi.processor.exception.FlowFileHandlingException:
> >>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,cla
> >>> i
> >>> m
> >>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@93
> >>> 9
> >>> 6 4c66,offset=244,name=225120878343804,size=42] transfer 
> >>> relationship not specified
> >>>
> >>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
> >>>
> >>> Any suggestions for troubleshooting?
> >>>
> >>> Rick
> >>>
> >>>
> >>
 		 	   		  

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Rick Braddy <rb...@softnas.com>.
I went back and re-read the Developer's Guide to try and understand how incoming FlowFiles should be handled relative to session transactions.

Currently, my processor uses session.get() to pull a batch of up to 10 incoming flowfiles and places them into an internal queue.  It never calls session.transfer() for these inbound flowfiles.  It just reads the contents from each one using session.read() and processes that input.  Each of these flowfiles contains a directory or filename of a file.

Then it creates up to 10 new outbound flowfiles as a batch, using session.importFrom() and session.transfer() for each one, then finally calls session.commit().  It is at this point that the framework raises an exception FlowFileHandlingException (transfer relationship not specified) and fails.  Based upon debugging, it appears the exception is raised against the first flowfile record that was recorded - the first incoming flowfile, for which session.transfer() was never called.

Questions:

1. Is it required to "dispose" of incoming flowfiles that have been accessed via session.get() using session.transfer()?

2. If so, should the incoming flowfiles be routed to an unused relationship, such as "ORIGINAL" like the SplitText processor does?

3. I read about the "SupportsBatching" attribute, which is not set on my processor (or the original GetFile processor, which does not read from input queues).  Given I am reading and writing flowfiles in batches, should this attribute be set?

Thanks
Rick

-----Original Message-----
From: Rick Braddy [mailto:rbraddy@softnas.com] 
Sent: Saturday, September 12, 2015 8:50 AM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

More information...Upon further debugging with in the StandardProcessSession : checkpoint() method, it's clear that there are 16 items in "record" list - 10 are incoming flowfiles that were read and processes, and 6 are the newly-created outbound flows.

The for loop is throwing the exception on the very first record, which is actually one of the 10 inbound flowfiles that has already been processed:

       //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
        final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
        for (final StandardRepositoryRecord record : records.values()) {
            if (record.isMarkedForDelete()) {
                continue;
            }
            final Relationship relationship = record.getTransferRelationship();
         if (relationship == null) {
                rollback();
 -->          throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship not specified");
            }

I modified the standard GetFile processor to accept incoming flowfiles that contain a file path per flowfile, so GetFileData (the new processor) can be triggered to process specific files.  I did NOT define a specific incoming relationship and just assumed there is one already available by default.  If there is not, that may be the problem.  There is clearly an inbound relationship established, as the inbound flowfiles are being read and processed just fine, but it seems that commit() calling checkpoint() doesn't like what it's seeing overall.

Rick

-----Original Message-----
From: Rick Braddy [mailto:rbraddy@softnas.com]
Sent: Saturday, September 12, 2015 7:42 AM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

Ok.  Thanks Joe.  The files I'm using are simple .c text and .png image files for testing.

Rick

-----Original Message-----
From: Joe Witt [mailto:joe.witt@gmail.com]
Sent: Saturday, September 12, 2015 7:41 AM
To: dev@nifi.apache.org
Subject: Re: Transfer relationship not specified (FlowFileHandlingException)

Yep.  Looks legit to me.  Will try a unit test with a mixture of flowFiles associated with content and without.

On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rb...@softnas.com> wrote:
> Joe,
>
> Replies below.
>
> Rick
>
>
> -----Original Message-----
> From: Joe Witt [mailto:joe.witt@gmail.com]
> Sent: Saturday, September 12, 2015 7:02 AM
> To: dev@nifi.apache.org
> Subject: Re: Transfer relationship not specified
> (FlowFileHandlingException)
>
> Rick
>
> Can you show what is happening in the exception handling part of your code as well?
>
> Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help).
>
> Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer().
>
> Here's the entire onTrigger method:
>
>     @Override
>     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
>         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
>         final ProcessorLog logger = getLogger();
>
>         final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
>         if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
>             try {
>                 final Set<File> filelist = getFileList(context, 
> session);
>
>                 queueLock.lock();
>                 try {
>                     filelist.removeAll(inProcess);
>                     if (!keepingSourceFile) {
>                         filelist.removeAll(recentlyProcessed);
>                     }
>
>                     fileQueue.clear();
>                     fileQueue.addAll(filelist);
>
>                     queueLastUpdated.set(System.currentTimeMillis());
>                     recentlyProcessed.clear();
>
>                     if (filelist.isEmpty()) {
>                         context.yield();
>                     }
>                 } finally {
>                     queueLock.unlock();
>                 }
>             } finally {
>                 filelistLock.unlock();
>             }
>         }
>
>         final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
>         final List<File> files = new ArrayList<>(batchSize);
>         queueLock.lock();
>         try {
>             fileQueue.drainTo(files, batchSize);
>             if (files.isEmpty()) {
>                 return;
>             } else {
>                 inProcess.addAll(files);
>             }
>         } finally {
>             queueLock.unlock();
>         }
>
>         final ListIterator<File> itr = files.listIterator();
>         FlowFile flowFile = null;
>         try {
>             while (itr.hasNext()) {
>                 final File file = itr.next();
>                 final Path filePath = file.toPath();
>                 final Path relativePath = filePath.relativize(filePath.getParent());
>                 String relativePathString = relativePath.toString() + "/";
>                 if (relativePathString.isEmpty()) {
>                     relativePathString = "./";
>                 }
>                 final Path absPath = filePath.toAbsolutePath();
>                 final String absPathString =
> absPath.getParent().toString() + "/";
>
>                 final long importStart = System.nanoTime();
>                 String fileType = "directory";
>                 if (file.isFile()){
>                     fileType = "file";
>                     flowFile = session.create();
>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>                 }
>                 else
>                 {
>                    logger.info("skipping directory {} and not placing into output flow", new Object[]{file});
>                    continue; // ******* SKIP DIRECTORIES FOR NOW ****
>                 }
>
>                 final long importNanos = System.nanoTime() - importStart;
>                 final long importMillis = 
> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>
>               //  flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>               //  Map<String, String> attributes = getAttributesFromFile(filePath);
>               //  if (attributes.size() > 0) {
>               //      flowFile = session.putAllAttributes(flowFile, attributes);
>               //  }
>
>                 final String fileURI = file.toURI().toString();
>                 session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
>                 session.transfer(flowFile, REL_SUCCESS);
>                 logger.info("added {} to flow", new 
> Object[]{flowFile});
>
>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>                     queueLock.lock();
>                     try {
>                         while (itr.hasNext()) {
>                             final File nextFile = itr.next();
>                             fileQueue.add(nextFile);
>                             inProcess.remove(nextFile);
>                         }
>                     } finally {
>                         queueLock.unlock();
>                     }
>                 }
>             }
>             session.commit();
>         } catch (final Exception e) {
>             logger.error("Failed to transfer files due to {}", e);
>             context.yield();
>
>
>             // anything that we've not already processed needs to be put back on the queue
>             if (flowFile != null) {
>                 session.remove(flowFile);
>             }
>         } finally {
>             queueLock.lock();
>             try {
>                 inProcess.removeAll(files);
>                 recentlyProcessed.addAll(files);
>             } finally {
>                 queueLock.unlock();
>             }
>         }
>     }
>
> }
>
> Also please confirm which codebase you're running against.  Latest HEAD of master?
>
> I'm using a snap from GitHub that's several weeks old from August 25th 
> (it's working fine with the original GetFile processor, which this 
> code was derived from)
>
> Thanks
> Joe
>
> On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
>> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
>>
>> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
>>
>> Not being intimate with the internals of the framework yet, not sure what would case this.
>>
>> Rick
>>
>> -----Original Message-----
>> From: Rick Braddy
>> Sent: Friday, September 11, 2015 8:26 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Mark,
>>
>> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
>>
>> Rick
>>
>>        final ListIterator<File> itr = files.listIterator();
>>         FlowFile flowFile = null;
>>         try {
>>             while (itr.hasNext()) {
>>                 final File file = itr.next();
>>                 final Path filePath = file.toPath();
>>                 final Path relativePath = filePath.relativize(filePath.getParent());
>>                 String relativePathString = relativePath.toString() + "/";
>>                 if (relativePathString.isEmpty()) {
>>                     relativePathString = "./";
>>                 }
>>                 final Path absPath = filePath.toAbsolutePath();
>>                 final String absPathString =
>> absPath.getParent().toString() + "/";
>>
>>                 final long importStart = System.nanoTime();
>>                 String fileType = "directory";
>>                 flowFile = session.create();
>>                 if (file.isFile()){
>>                     fileType = "file";
>>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>>                 }
>>                 final long importNanos = System.nanoTime() - importStart;
>>                 final long importMillis = 
>> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>>
>>                 flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>>                 Map<String, String> attributes = getAttributesFromFile(filePath);
>>                 if (attributes.size() > 0) {
>>                     flowFile = session.putAllAttributes(flowFile, attributes);
>>                 }
>>
>>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
>>                 session.transfer(flowFile, REL_SUCCESS);
>>                 logger.info("added {} to flow", new 
>> Object[]{flowFile});
>>
>>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>>                     queueLock.lock();
>>                     try {
>>                         while (itr.hasNext()) {
>>                             final File nextFile = itr.next();
>>                             fileQueue.add(nextFile);
>>                             inProcess.remove(nextFile);
>>                         }
>>                     } finally {
>>                         queueLock.unlock();
>>                     }
>>                 }
>>             }
>>             session.commit();
>>         } catch (final Exception e) {
>>             logger.error("Failed to transfer files due to {}", e);
>>
>> -----Original Message-----
>> From: Mark Payne [mailto:markap14@hotmail.com]
>> Sent: Friday, September 11, 2015 6:39 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Rick,
>> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
>> I.e., there was never a call to session.transfer() for that FlowFile.
>> Thanks-Mark
>>
>>> From: rbraddy@softnas.com
>>> To: dev@nifi.apache.org
>>> Subject: RE: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>> Date: Fri, 11 Sep 2015 23:25:33 +0000
>>>
>>> Some more details:
>>>
>>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to 
>>> process session due to
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe
>>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
>>> session
>>> (StandardProcessSession[id=6967]):
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe
>>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
>>> session
>>> (StandardProcessSession[id=6967])
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
>>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff
>>> 0 c ad6b,offset=0,name=anImage.png,size=16418] to flow
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
>>>
>>> So it seems there's some issue with each of the FlowFiles...
>>>
>>> -----Original Message-----
>>> From: Rick Braddy [mailto:rbraddy@softnas.com]
>>> Sent: Friday, September 11, 2015 6:00 PM
>>> To: dev@nifi.apache.org
>>> Subject: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>>
>>> Hi,
>>>
>>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
>>>
>>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
>>> c.s.c.processors.files.GetFileData
>>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
>>> retrieve files due to {}
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@93
>>> 9
>>> 6 4c66,offset=244,name=225120878343804,size=42] transfer 
>>> relationship not specified
>>>
>>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
>>>
>>> Any suggestions for troubleshooting?
>>>
>>> Rick
>>>
>>>
>>

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Rick Braddy <rb...@softnas.com>.
More information...Upon further debugging with in the StandardProcessSession : checkpoint() method, it's clear that there are 16 items in "record" list - 10 are incoming flowfiles that were read and processes, and 6 are the newly-created outbound flows.

The for loop is throwing the exception on the very first record, which is actually one of the 10 inbound flowfiles that has already been processed:

       //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
        final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
        for (final StandardRepositoryRecord record : records.values()) {
            if (record.isMarkedForDelete()) {
                continue;
            }
            final Relationship relationship = record.getTransferRelationship();
         if (relationship == null) {
                rollback();
 -->          throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship not specified");
            }

I modified the standard GetFile processor to accept incoming flowfiles that contain a file path per flowfile, so GetFileData (the new processor) can be triggered to process specific files.  I did NOT define a specific incoming relationship and just assumed there is one already available by default.  If there is not, that may be the problem.  There is clearly an inbound relationship established, as the inbound flowfiles are being read and processed just fine, but it seems that commit() calling checkpoint() doesn't like what it's seeing overall.

Rick

-----Original Message-----
From: Rick Braddy [mailto:rbraddy@softnas.com] 
Sent: Saturday, September 12, 2015 7:42 AM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

Ok.  Thanks Joe.  The files I'm using are simple .c text and .png image files for testing.

Rick

-----Original Message-----
From: Joe Witt [mailto:joe.witt@gmail.com]
Sent: Saturday, September 12, 2015 7:41 AM
To: dev@nifi.apache.org
Subject: Re: Transfer relationship not specified (FlowFileHandlingException)

Yep.  Looks legit to me.  Will try a unit test with a mixture of flowFiles associated with content and without.

On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rb...@softnas.com> wrote:
> Joe,
>
> Replies below.
>
> Rick
>
>
> -----Original Message-----
> From: Joe Witt [mailto:joe.witt@gmail.com]
> Sent: Saturday, September 12, 2015 7:02 AM
> To: dev@nifi.apache.org
> Subject: Re: Transfer relationship not specified
> (FlowFileHandlingException)
>
> Rick
>
> Can you show what is happening in the exception handling part of your code as well?
>
> Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help).
>
> Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer().
>
> Here's the entire onTrigger method:
>
>     @Override
>     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
>         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
>         final ProcessorLog logger = getLogger();
>
>         final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
>         if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
>             try {
>                 final Set<File> filelist = getFileList(context, 
> session);
>
>                 queueLock.lock();
>                 try {
>                     filelist.removeAll(inProcess);
>                     if (!keepingSourceFile) {
>                         filelist.removeAll(recentlyProcessed);
>                     }
>
>                     fileQueue.clear();
>                     fileQueue.addAll(filelist);
>
>                     queueLastUpdated.set(System.currentTimeMillis());
>                     recentlyProcessed.clear();
>
>                     if (filelist.isEmpty()) {
>                         context.yield();
>                     }
>                 } finally {
>                     queueLock.unlock();
>                 }
>             } finally {
>                 filelistLock.unlock();
>             }
>         }
>
>         final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
>         final List<File> files = new ArrayList<>(batchSize);
>         queueLock.lock();
>         try {
>             fileQueue.drainTo(files, batchSize);
>             if (files.isEmpty()) {
>                 return;
>             } else {
>                 inProcess.addAll(files);
>             }
>         } finally {
>             queueLock.unlock();
>         }
>
>         final ListIterator<File> itr = files.listIterator();
>         FlowFile flowFile = null;
>         try {
>             while (itr.hasNext()) {
>                 final File file = itr.next();
>                 final Path filePath = file.toPath();
>                 final Path relativePath = filePath.relativize(filePath.getParent());
>                 String relativePathString = relativePath.toString() + "/";
>                 if (relativePathString.isEmpty()) {
>                     relativePathString = "./";
>                 }
>                 final Path absPath = filePath.toAbsolutePath();
>                 final String absPathString =
> absPath.getParent().toString() + "/";
>
>                 final long importStart = System.nanoTime();
>                 String fileType = "directory";
>                 if (file.isFile()){
>                     fileType = "file";
>                     flowFile = session.create();
>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>                 }
>                 else
>                 {
>                    logger.info("skipping directory {} and not placing into output flow", new Object[]{file});
>                    continue; // ******* SKIP DIRECTORIES FOR NOW ****
>                 }
>
>                 final long importNanos = System.nanoTime() - importStart;
>                 final long importMillis = 
> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>
>               //  flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>               //  Map<String, String> attributes = getAttributesFromFile(filePath);
>               //  if (attributes.size() > 0) {
>               //      flowFile = session.putAllAttributes(flowFile, attributes);
>               //  }
>
>                 final String fileURI = file.toURI().toString();
>                 session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
>                 session.transfer(flowFile, REL_SUCCESS);
>                 logger.info("added {} to flow", new 
> Object[]{flowFile});
>
>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>                     queueLock.lock();
>                     try {
>                         while (itr.hasNext()) {
>                             final File nextFile = itr.next();
>                             fileQueue.add(nextFile);
>                             inProcess.remove(nextFile);
>                         }
>                     } finally {
>                         queueLock.unlock();
>                     }
>                 }
>             }
>             session.commit();
>         } catch (final Exception e) {
>             logger.error("Failed to transfer files due to {}", e);
>             context.yield();
>
>
>             // anything that we've not already processed needs to be put back on the queue
>             if (flowFile != null) {
>                 session.remove(flowFile);
>             }
>         } finally {
>             queueLock.lock();
>             try {
>                 inProcess.removeAll(files);
>                 recentlyProcessed.addAll(files);
>             } finally {
>                 queueLock.unlock();
>             }
>         }
>     }
>
> }
>
> Also please confirm which codebase you're running against.  Latest HEAD of master?
>
> I'm using a snap from GitHub that's several weeks old from August 25th 
> (it's working fine with the original GetFile processor, which this 
> code was derived from)
>
> Thanks
> Joe
>
> On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
>> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
>>
>> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
>>
>> Not being intimate with the internals of the framework yet, not sure what would case this.
>>
>> Rick
>>
>> -----Original Message-----
>> From: Rick Braddy
>> Sent: Friday, September 11, 2015 8:26 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Mark,
>>
>> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
>>
>> Rick
>>
>>        final ListIterator<File> itr = files.listIterator();
>>         FlowFile flowFile = null;
>>         try {
>>             while (itr.hasNext()) {
>>                 final File file = itr.next();
>>                 final Path filePath = file.toPath();
>>                 final Path relativePath = filePath.relativize(filePath.getParent());
>>                 String relativePathString = relativePath.toString() + "/";
>>                 if (relativePathString.isEmpty()) {
>>                     relativePathString = "./";
>>                 }
>>                 final Path absPath = filePath.toAbsolutePath();
>>                 final String absPathString =
>> absPath.getParent().toString() + "/";
>>
>>                 final long importStart = System.nanoTime();
>>                 String fileType = "directory";
>>                 flowFile = session.create();
>>                 if (file.isFile()){
>>                     fileType = "file";
>>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>>                 }
>>                 final long importNanos = System.nanoTime() - importStart;
>>                 final long importMillis = 
>> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>>
>>                 flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>>                 Map<String, String> attributes = getAttributesFromFile(filePath);
>>                 if (attributes.size() > 0) {
>>                     flowFile = session.putAllAttributes(flowFile, attributes);
>>                 }
>>
>>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
>>                 session.transfer(flowFile, REL_SUCCESS);
>>                 logger.info("added {} to flow", new 
>> Object[]{flowFile});
>>
>>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>>                     queueLock.lock();
>>                     try {
>>                         while (itr.hasNext()) {
>>                             final File nextFile = itr.next();
>>                             fileQueue.add(nextFile);
>>                             inProcess.remove(nextFile);
>>                         }
>>                     } finally {
>>                         queueLock.unlock();
>>                     }
>>                 }
>>             }
>>             session.commit();
>>         } catch (final Exception e) {
>>             logger.error("Failed to transfer files due to {}", e);
>>
>> -----Original Message-----
>> From: Mark Payne [mailto:markap14@hotmail.com]
>> Sent: Friday, September 11, 2015 6:39 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Rick,
>> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
>> I.e., there was never a call to session.transfer() for that FlowFile.
>> Thanks-Mark
>>
>>> From: rbraddy@softnas.com
>>> To: dev@nifi.apache.org
>>> Subject: RE: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>> Date: Fri, 11 Sep 2015 23:25:33 +0000
>>>
>>> Some more details:
>>>
>>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to 
>>> process session due to
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe
>>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
>>> session
>>> (StandardProcessSession[id=6967]):
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe
>>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
>>> session
>>> (StandardProcessSession[id=6967])
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
>>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff
>>> 0 c ad6b,offset=0,name=anImage.png,size=16418] to flow
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
>>>
>>> So it seems there's some issue with each of the FlowFiles...
>>>
>>> -----Original Message-----
>>> From: Rick Braddy [mailto:rbraddy@softnas.com]
>>> Sent: Friday, September 11, 2015 6:00 PM
>>> To: dev@nifi.apache.org
>>> Subject: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>>
>>> Hi,
>>>
>>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
>>>
>>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
>>> c.s.c.processors.files.GetFileData
>>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
>>> retrieve files due to {}
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@93
>>> 9
>>> 6 4c66,offset=244,name=225120878343804,size=42] transfer 
>>> relationship not specified
>>>
>>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
>>>
>>> Any suggestions for troubleshooting?
>>>
>>> Rick
>>>
>>>
>>

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Rick Braddy <rb...@softnas.com>.
Ok.  Thanks Joe.  The files I'm using are simple .c text and .png image files for testing.

Rick

-----Original Message-----
From: Joe Witt [mailto:joe.witt@gmail.com] 
Sent: Saturday, September 12, 2015 7:41 AM
To: dev@nifi.apache.org
Subject: Re: Transfer relationship not specified (FlowFileHandlingException)

Yep.  Looks legit to me.  Will try a unit test with a mixture of flowFiles associated with content and without.

On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rb...@softnas.com> wrote:
> Joe,
>
> Replies below.
>
> Rick
>
>
> -----Original Message-----
> From: Joe Witt [mailto:joe.witt@gmail.com]
> Sent: Saturday, September 12, 2015 7:02 AM
> To: dev@nifi.apache.org
> Subject: Re: Transfer relationship not specified 
> (FlowFileHandlingException)
>
> Rick
>
> Can you show what is happening in the exception handling part of your code as well?
>
> Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help).
>
> Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer().
>
> Here's the entire onTrigger method:
>
>     @Override
>     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
>         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
>         final ProcessorLog logger = getLogger();
>
>         final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
>         if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
>             try {
>                 final Set<File> filelist = getFileList(context, 
> session);
>
>                 queueLock.lock();
>                 try {
>                     filelist.removeAll(inProcess);
>                     if (!keepingSourceFile) {
>                         filelist.removeAll(recentlyProcessed);
>                     }
>
>                     fileQueue.clear();
>                     fileQueue.addAll(filelist);
>
>                     queueLastUpdated.set(System.currentTimeMillis());
>                     recentlyProcessed.clear();
>
>                     if (filelist.isEmpty()) {
>                         context.yield();
>                     }
>                 } finally {
>                     queueLock.unlock();
>                 }
>             } finally {
>                 filelistLock.unlock();
>             }
>         }
>
>         final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
>         final List<File> files = new ArrayList<>(batchSize);
>         queueLock.lock();
>         try {
>             fileQueue.drainTo(files, batchSize);
>             if (files.isEmpty()) {
>                 return;
>             } else {
>                 inProcess.addAll(files);
>             }
>         } finally {
>             queueLock.unlock();
>         }
>
>         final ListIterator<File> itr = files.listIterator();
>         FlowFile flowFile = null;
>         try {
>             while (itr.hasNext()) {
>                 final File file = itr.next();
>                 final Path filePath = file.toPath();
>                 final Path relativePath = filePath.relativize(filePath.getParent());
>                 String relativePathString = relativePath.toString() + "/";
>                 if (relativePathString.isEmpty()) {
>                     relativePathString = "./";
>                 }
>                 final Path absPath = filePath.toAbsolutePath();
>                 final String absPathString = 
> absPath.getParent().toString() + "/";
>
>                 final long importStart = System.nanoTime();
>                 String fileType = "directory";
>                 if (file.isFile()){
>                     fileType = "file";
>                     flowFile = session.create();
>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>                 }
>                 else
>                 {
>                    logger.info("skipping directory {} and not placing into output flow", new Object[]{file});
>                    continue; // ******* SKIP DIRECTORIES FOR NOW ****
>                 }
>
>                 final long importNanos = System.nanoTime() - importStart;
>                 final long importMillis = 
> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>
>               //  flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>               //  Map<String, String> attributes = getAttributesFromFile(filePath);
>               //  if (attributes.size() > 0) {
>               //      flowFile = session.putAllAttributes(flowFile, attributes);
>               //  }
>
>                 final String fileURI = file.toURI().toString();
>                 session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
>                 session.transfer(flowFile, REL_SUCCESS);
>                 logger.info("added {} to flow", new 
> Object[]{flowFile});
>
>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>                     queueLock.lock();
>                     try {
>                         while (itr.hasNext()) {
>                             final File nextFile = itr.next();
>                             fileQueue.add(nextFile);
>                             inProcess.remove(nextFile);
>                         }
>                     } finally {
>                         queueLock.unlock();
>                     }
>                 }
>             }
>             session.commit();
>         } catch (final Exception e) {
>             logger.error("Failed to transfer files due to {}", e);
>             context.yield();
>
>
>             // anything that we've not already processed needs to be put back on the queue
>             if (flowFile != null) {
>                 session.remove(flowFile);
>             }
>         } finally {
>             queueLock.lock();
>             try {
>                 inProcess.removeAll(files);
>                 recentlyProcessed.addAll(files);
>             } finally {
>                 queueLock.unlock();
>             }
>         }
>     }
>
> }
>
> Also please confirm which codebase you're running against.  Latest HEAD of master?
>
> I'm using a snap from GitHub that's several weeks old from August 25th 
> (it's working fine with the original GetFile processor, which this 
> code was derived from)
>
> Thanks
> Joe
>
> On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
>> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
>>
>> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
>>
>> Not being intimate with the internals of the framework yet, not sure what would case this.
>>
>> Rick
>>
>> -----Original Message-----
>> From: Rick Braddy
>> Sent: Friday, September 11, 2015 8:26 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Mark,
>>
>> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
>>
>> Rick
>>
>>        final ListIterator<File> itr = files.listIterator();
>>         FlowFile flowFile = null;
>>         try {
>>             while (itr.hasNext()) {
>>                 final File file = itr.next();
>>                 final Path filePath = file.toPath();
>>                 final Path relativePath = filePath.relativize(filePath.getParent());
>>                 String relativePathString = relativePath.toString() + "/";
>>                 if (relativePathString.isEmpty()) {
>>                     relativePathString = "./";
>>                 }
>>                 final Path absPath = filePath.toAbsolutePath();
>>                 final String absPathString =
>> absPath.getParent().toString() + "/";
>>
>>                 final long importStart = System.nanoTime();
>>                 String fileType = "directory";
>>                 flowFile = session.create();
>>                 if (file.isFile()){
>>                     fileType = "file";
>>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>>                 }
>>                 final long importNanos = System.nanoTime() - importStart;
>>                 final long importMillis = 
>> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>>
>>                 flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>>                 Map<String, String> attributes = getAttributesFromFile(filePath);
>>                 if (attributes.size() > 0) {
>>                     flowFile = session.putAllAttributes(flowFile, attributes);
>>                 }
>>
>>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
>>                 session.transfer(flowFile, REL_SUCCESS);
>>                 logger.info("added {} to flow", new 
>> Object[]{flowFile});
>>
>>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>>                     queueLock.lock();
>>                     try {
>>                         while (itr.hasNext()) {
>>                             final File nextFile = itr.next();
>>                             fileQueue.add(nextFile);
>>                             inProcess.remove(nextFile);
>>                         }
>>                     } finally {
>>                         queueLock.unlock();
>>                     }
>>                 }
>>             }
>>             session.commit();
>>         } catch (final Exception e) {
>>             logger.error("Failed to transfer files due to {}", e);
>>
>> -----Original Message-----
>> From: Mark Payne [mailto:markap14@hotmail.com]
>> Sent: Friday, September 11, 2015 6:39 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Rick,
>> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
>> I.e., there was never a call to session.transfer() for that FlowFile.
>> Thanks-Mark
>>
>>> From: rbraddy@softnas.com
>>> To: dev@nifi.apache.org
>>> Subject: RE: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>> Date: Fri, 11 Sep 2015 23:25:33 +0000
>>>
>>> Some more details:
>>>
>>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to 
>>> process session due to
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe
>>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
>>> session
>>> (StandardProcessSession[id=6967]):
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe
>>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
>>> session
>>> (StandardProcessSession[id=6967])
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
>>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff
>>> 0 c ad6b,offset=0,name=anImage.png,size=16418] to flow
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
>>>
>>> So it seems there's some issue with each of the FlowFiles...
>>>
>>> -----Original Message-----
>>> From: Rick Braddy [mailto:rbraddy@softnas.com]
>>> Sent: Friday, September 11, 2015 6:00 PM
>>> To: dev@nifi.apache.org
>>> Subject: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>>
>>> Hi,
>>>
>>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
>>>
>>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
>>> c.s.c.processors.files.GetFileData
>>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
>>> retrieve files due to {}
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@93
>>> 9
>>> 6 4c66,offset=244,name=225120878343804,size=42] transfer 
>>> relationship not specified
>>>
>>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
>>>
>>> Any suggestions for troubleshooting?
>>>
>>> Rick
>>>
>>>
>>

Re: Transfer relationship not specified (FlowFileHandlingException)

Posted by Joe Witt <jo...@gmail.com>.
Yep.  Looks legit to me.  Will try a unit test with a mixture of
flowFiles associated with content and without.

On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rb...@softnas.com> wrote:
> Joe,
>
> Replies below.
>
> Rick
>
>
> -----Original Message-----
> From: Joe Witt [mailto:joe.witt@gmail.com]
> Sent: Saturday, September 12, 2015 7:02 AM
> To: dev@nifi.apache.org
> Subject: Re: Transfer relationship not specified (FlowFileHandlingException)
>
> Rick
>
> Can you show what is happening in the exception handling part of your code as well?
>
> Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help).
>
> Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer().
>
> Here's the entire onTrigger method:
>
>     @Override
>     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
>         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
>         final ProcessorLog logger = getLogger();
>
>         final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
>         if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
>             try {
>                 final Set<File> filelist = getFileList(context, session);
>
>                 queueLock.lock();
>                 try {
>                     filelist.removeAll(inProcess);
>                     if (!keepingSourceFile) {
>                         filelist.removeAll(recentlyProcessed);
>                     }
>
>                     fileQueue.clear();
>                     fileQueue.addAll(filelist);
>
>                     queueLastUpdated.set(System.currentTimeMillis());
>                     recentlyProcessed.clear();
>
>                     if (filelist.isEmpty()) {
>                         context.yield();
>                     }
>                 } finally {
>                     queueLock.unlock();
>                 }
>             } finally {
>                 filelistLock.unlock();
>             }
>         }
>
>         final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
>         final List<File> files = new ArrayList<>(batchSize);
>         queueLock.lock();
>         try {
>             fileQueue.drainTo(files, batchSize);
>             if (files.isEmpty()) {
>                 return;
>             } else {
>                 inProcess.addAll(files);
>             }
>         } finally {
>             queueLock.unlock();
>         }
>
>         final ListIterator<File> itr = files.listIterator();
>         FlowFile flowFile = null;
>         try {
>             while (itr.hasNext()) {
>                 final File file = itr.next();
>                 final Path filePath = file.toPath();
>                 final Path relativePath = filePath.relativize(filePath.getParent());
>                 String relativePathString = relativePath.toString() + "/";
>                 if (relativePathString.isEmpty()) {
>                     relativePathString = "./";
>                 }
>                 final Path absPath = filePath.toAbsolutePath();
>                 final String absPathString = absPath.getParent().toString() + "/";
>
>                 final long importStart = System.nanoTime();
>                 String fileType = "directory";
>                 if (file.isFile()){
>                     fileType = "file";
>                     flowFile = session.create();
>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>                 }
>                 else
>                 {
>                    logger.info("skipping directory {} and not placing into output flow", new Object[]{file});
>                    continue; // ******* SKIP DIRECTORIES FOR NOW ****
>                 }
>
>                 final long importNanos = System.nanoTime() - importStart;
>                 final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>
>               //  flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>               //  Map<String, String> attributes = getAttributesFromFile(filePath);
>               //  if (attributes.size() > 0) {
>               //      flowFile = session.putAllAttributes(flowFile, attributes);
>               //  }
>
>                 final String fileURI = file.toURI().toString();
>                 session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
>                 session.transfer(flowFile, REL_SUCCESS);
>                 logger.info("added {} to flow", new Object[]{flowFile});
>
>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>                     queueLock.lock();
>                     try {
>                         while (itr.hasNext()) {
>                             final File nextFile = itr.next();
>                             fileQueue.add(nextFile);
>                             inProcess.remove(nextFile);
>                         }
>                     } finally {
>                         queueLock.unlock();
>                     }
>                 }
>             }
>             session.commit();
>         } catch (final Exception e) {
>             logger.error("Failed to transfer files due to {}", e);
>             context.yield();
>
>
>             // anything that we've not already processed needs to be put back on the queue
>             if (flowFile != null) {
>                 session.remove(flowFile);
>             }
>         } finally {
>             queueLock.lock();
>             try {
>                 inProcess.removeAll(files);
>                 recentlyProcessed.addAll(files);
>             } finally {
>                 queueLock.unlock();
>             }
>         }
>     }
>
> }
>
> Also please confirm which codebase you're running against.  Latest HEAD of master?
>
> I'm using a snap from GitHub that's several weeks old from August 25th (it's working fine with the original GetFile processor, which this code was derived from)
>
> Thanks
> Joe
>
> On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
>> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
>>
>> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
>>
>> Not being intimate with the internals of the framework yet, not sure what would case this.
>>
>> Rick
>>
>> -----Original Message-----
>> From: Rick Braddy
>> Sent: Friday, September 11, 2015 8:26 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Mark,
>>
>> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
>>
>> Rick
>>
>>        final ListIterator<File> itr = files.listIterator();
>>         FlowFile flowFile = null;
>>         try {
>>             while (itr.hasNext()) {
>>                 final File file = itr.next();
>>                 final Path filePath = file.toPath();
>>                 final Path relativePath = filePath.relativize(filePath.getParent());
>>                 String relativePathString = relativePath.toString() + "/";
>>                 if (relativePathString.isEmpty()) {
>>                     relativePathString = "./";
>>                 }
>>                 final Path absPath = filePath.toAbsolutePath();
>>                 final String absPathString =
>> absPath.getParent().toString() + "/";
>>
>>                 final long importStart = System.nanoTime();
>>                 String fileType = "directory";
>>                 flowFile = session.create();
>>                 if (file.isFile()){
>>                     fileType = "file";
>>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>>                 }
>>                 final long importNanos = System.nanoTime() - importStart;
>>                 final long importMillis =
>> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>>
>>                 flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>>                 Map<String, String> attributes = getAttributesFromFile(filePath);
>>                 if (attributes.size() > 0) {
>>                     flowFile = session.putAllAttributes(flowFile, attributes);
>>                 }
>>
>>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
>>                 session.transfer(flowFile, REL_SUCCESS);
>>                 logger.info("added {} to flow", new
>> Object[]{flowFile});
>>
>>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>>                     queueLock.lock();
>>                     try {
>>                         while (itr.hasNext()) {
>>                             final File nextFile = itr.next();
>>                             fileQueue.add(nextFile);
>>                             inProcess.remove(nextFile);
>>                         }
>>                     } finally {
>>                         queueLock.unlock();
>>                     }
>>                 }
>>             }
>>             session.commit();
>>         } catch (final Exception e) {
>>             logger.error("Failed to transfer files due to {}", e);
>>
>> -----Original Message-----
>> From: Mark Payne [mailto:markap14@hotmail.com]
>> Sent: Friday, September 11, 2015 6:39 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Rick,
>> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
>> I.e., there was never a call to session.transfer() for that FlowFile.
>> Thanks-Mark
>>
>>> From: rbraddy@softnas.com
>>> To: dev@nifi.apache.org
>>> Subject: RE: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>> Date: Fri, 11 Sep 2015 23:25:33 +0000
>>>
>>> Some more details:
>>>
>>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3]
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to
>>> process session due to
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,clai
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1
>>> e a005,offset=0,name=printargs.c,size=190] is not known in this
>>> session
>>> (StandardProcessSession[id=6967]):
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,clai
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1
>>> e a005,offset=0,name=printargs.c,size=190] is not known in this
>>> session
>>> (StandardProcessSession[id=6967])
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3]
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added
>>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,clai
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff0
>>> c ad6b,offset=0,name=anImage.png,size=16418] to flow
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3]
>>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
>>>
>>> So it seems there's some issue with each of the FlowFiles...
>>>
>>> -----Original Message-----
>>> From: Rick Braddy [mailto:rbraddy@softnas.com]
>>> Sent: Friday, September 11, 2015 6:00 PM
>>> To: dev@nifi.apache.org
>>> Subject: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>>
>>> Hi,
>>>
>>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
>>>
>>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6]
>>> c.s.c.processors.files.GetFileData
>>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to
>>> retrieve files due to {}
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,clai
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@939
>>> 6 4c66,offset=244,name=225120878343804,size=42] transfer relationship
>>> not specified
>>>
>>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
>>>
>>> Any suggestions for troubleshooting?
>>>
>>> Rick
>>>
>>>
>>

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Rick Braddy <rb...@softnas.com>.
Joe,

Replies below.

Rick


-----Original Message-----
From: Joe Witt [mailto:joe.witt@gmail.com] 
Sent: Saturday, September 12, 2015 7:02 AM
To: dev@nifi.apache.org
Subject: Re: Transfer relationship not specified (FlowFileHandlingException)

Rick

Can you show what is happening in the exception handling part of your code as well?

Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help).

Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer().

Here's the entire onTrigger method:

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
        final ProcessorLog logger = getLogger();

        final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
        if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
            try {
                final Set<File> filelist = getFileList(context, session);

                queueLock.lock();
                try {
                    filelist.removeAll(inProcess);
                    if (!keepingSourceFile) {
                        filelist.removeAll(recentlyProcessed);
                    }

                    fileQueue.clear();
                    fileQueue.addAll(filelist);

                    queueLastUpdated.set(System.currentTimeMillis());
                    recentlyProcessed.clear();

                    if (filelist.isEmpty()) {
                        context.yield();
                    }
                } finally {
                    queueLock.unlock();
                }
            } finally {
                filelistLock.unlock();
            }
        }

        final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
        final List<File> files = new ArrayList<>(batchSize);
        queueLock.lock();
        try {
            fileQueue.drainTo(files, batchSize);
            if (files.isEmpty()) {
                return;
            } else {
                inProcess.addAll(files);
            }
        } finally {
            queueLock.unlock();
        }

        final ListIterator<File> itr = files.listIterator();
        FlowFile flowFile = null;
        try {
            while (itr.hasNext()) {
                final File file = itr.next();
                final Path filePath = file.toPath();
                final Path relativePath = filePath.relativize(filePath.getParent());
                String relativePathString = relativePath.toString() + "/";
                if (relativePathString.isEmpty()) {
                    relativePathString = "./";
                }
                final Path absPath = filePath.toAbsolutePath();
                final String absPathString = absPath.getParent().toString() + "/";

                final long importStart = System.nanoTime();
                String fileType = "directory";
                if (file.isFile()){
                    fileType = "file";
                    flowFile = session.create();
                    flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
                }
                else
                {
                   logger.info("skipping directory {} and not placing into output flow", new Object[]{file});
                   continue; // ******* SKIP DIRECTORIES FOR NOW ****
                }

                final long importNanos = System.nanoTime() - importStart;
                final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);

              //  flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
              //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
              //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
              //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
              //  Map<String, String> attributes = getAttributesFromFile(filePath);
              //  if (attributes.size() > 0) {
              //      flowFile = session.putAllAttributes(flowFile, attributes);
              //  }

                final String fileURI = file.toURI().toString();
                session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
                session.transfer(flowFile, REL_SUCCESS);
                logger.info("added {} to flow", new Object[]{flowFile});

                if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
                    queueLock.lock();
                    try {
                        while (itr.hasNext()) {
                            final File nextFile = itr.next();
                            fileQueue.add(nextFile);
                            inProcess.remove(nextFile);
                        }
                    } finally {
                        queueLock.unlock();
                    }
                }
            }
            session.commit();
        } catch (final Exception e) {
            logger.error("Failed to transfer files due to {}", e);
            context.yield();
            

            // anything that we've not already processed needs to be put back on the queue
            if (flowFile != null) {
                session.remove(flowFile);
            }
        } finally {
            queueLock.lock();
            try {
                inProcess.removeAll(files);
                recentlyProcessed.addAll(files);
            } finally {
                queueLock.unlock();
            }
        }
    }

}

Also please confirm which codebase you're running against.  Latest HEAD of master?

I'm using a snap from GitHub that's several weeks old from August 25th (it's working fine with the original GetFile processor, which this code was derived from)

Thanks
Joe

On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
>
> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
>
> Not being intimate with the internals of the framework yet, not sure what would case this.
>
> Rick
>
> -----Original Message-----
> From: Rick Braddy
> Sent: Friday, September 11, 2015 8:26 PM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified 
> (FlowFileHandlingException)
>
> Mark,
>
> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
>
> Rick
>
>        final ListIterator<File> itr = files.listIterator();
>         FlowFile flowFile = null;
>         try {
>             while (itr.hasNext()) {
>                 final File file = itr.next();
>                 final Path filePath = file.toPath();
>                 final Path relativePath = filePath.relativize(filePath.getParent());
>                 String relativePathString = relativePath.toString() + "/";
>                 if (relativePathString.isEmpty()) {
>                     relativePathString = "./";
>                 }
>                 final Path absPath = filePath.toAbsolutePath();
>                 final String absPathString = 
> absPath.getParent().toString() + "/";
>
>                 final long importStart = System.nanoTime();
>                 String fileType = "directory";
>                 flowFile = session.create();
>                 if (file.isFile()){
>                     fileType = "file";
>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>                 }
>                 final long importNanos = System.nanoTime() - importStart;
>                 final long importMillis = 
> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>
>                 flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>                 Map<String, String> attributes = getAttributesFromFile(filePath);
>                 if (attributes.size() > 0) {
>                     flowFile = session.putAllAttributes(flowFile, attributes);
>                 }
>
>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
>                 session.transfer(flowFile, REL_SUCCESS);
>                 logger.info("added {} to flow", new 
> Object[]{flowFile});
>
>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>                     queueLock.lock();
>                     try {
>                         while (itr.hasNext()) {
>                             final File nextFile = itr.next();
>                             fileQueue.add(nextFile);
>                             inProcess.remove(nextFile);
>                         }
>                     } finally {
>                         queueLock.unlock();
>                     }
>                 }
>             }
>             session.commit();
>         } catch (final Exception e) {
>             logger.error("Failed to transfer files due to {}", e);
>
> -----Original Message-----
> From: Mark Payne [mailto:markap14@hotmail.com]
> Sent: Friday, September 11, 2015 6:39 PM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified 
> (FlowFileHandlingException)
>
> Rick,
> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
> I.e., there was never a call to session.transfer() for that FlowFile.
> Thanks-Mark
>
>> From: rbraddy@softnas.com
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>> Date: Fri, 11 Sep 2015 23:25:33 +0000
>>
>> Some more details:
>>
>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
>> c.s.c.processors.files.GetFileData
>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to 
>> process session due to
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,clai
>> m 
>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1
>> e a005,offset=0,name=printargs.c,size=190] is not known in this 
>> session
>> (StandardProcessSession[id=6967]):
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,clai
>> m 
>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1
>> e a005,offset=0,name=printargs.c,size=190] is not known in this 
>> session
>> (StandardProcessSession[id=6967])
>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>> c.s.c.processors.files.GetFileData
>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,clai
>> m 
>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff0
>> c ad6b,offset=0,name=anImage.png,size=16418] to flow
>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
>>
>> So it seems there's some issue with each of the FlowFiles...
>>
>> -----Original Message-----
>> From: Rick Braddy [mailto:rbraddy@softnas.com]
>> Sent: Friday, September 11, 2015 6:00 PM
>> To: dev@nifi.apache.org
>> Subject: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Hi,
>>
>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
>>
>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
>> c.s.c.processors.files.GetFileData
>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
>> retrieve files due to {}
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,clai
>> m
>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@939
>> 6 4c66,offset=244,name=225120878343804,size=42] transfer relationship 
>> not specified
>>
>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
>>
>> Any suggestions for troubleshooting?
>>
>> Rick
>>
>>
>

Re: Transfer relationship not specified (FlowFileHandlingException)

Posted by Joe Witt <jo...@gmail.com>.
Rick

Can you show what is happening in the exception handling part of your
code as well?

Also please confirm which codebase you're running against.  Latest
HEAD of master?

Thanks
Joe

On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rb...@softnas.com> wrote:
> So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.
>
> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.
>
> Not being intimate with the internals of the framework yet, not sure what would case this.
>
> Rick
>
> -----Original Message-----
> From: Rick Braddy
> Sent: Friday, September 11, 2015 8:26 PM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified (FlowFileHandlingException)
>
> Mark,
>
> The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.
>
> Rick
>
>        final ListIterator<File> itr = files.listIterator();
>         FlowFile flowFile = null;
>         try {
>             while (itr.hasNext()) {
>                 final File file = itr.next();
>                 final Path filePath = file.toPath();
>                 final Path relativePath = filePath.relativize(filePath.getParent());
>                 String relativePathString = relativePath.toString() + "/";
>                 if (relativePathString.isEmpty()) {
>                     relativePathString = "./";
>                 }
>                 final Path absPath = filePath.toAbsolutePath();
>                 final String absPathString = absPath.getParent().toString() + "/";
>
>                 final long importStart = System.nanoTime();
>                 String fileType = "directory";
>                 flowFile = session.create();
>                 if (file.isFile()){
>                     fileType = "file";
>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>                 }
>                 final long importNanos = System.nanoTime() - importStart;
>                 final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>
>                 flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
>                 Map<String, String> attributes = getAttributesFromFile(filePath);
>                 if (attributes.size() > 0) {
>                     flowFile = session.putAllAttributes(flowFile, attributes);
>                 }
>
>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
>                 session.transfer(flowFile, REL_SUCCESS);
>                 logger.info("added {} to flow", new Object[]{flowFile});
>
>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
>                     queueLock.lock();
>                     try {
>                         while (itr.hasNext()) {
>                             final File nextFile = itr.next();
>                             fileQueue.add(nextFile);
>                             inProcess.remove(nextFile);
>                         }
>                     } finally {
>                         queueLock.unlock();
>                     }
>                 }
>             }
>             session.commit();
>         } catch (final Exception e) {
>             logger.error("Failed to transfer files due to {}", e);
>
> -----Original Message-----
> From: Mark Payne [mailto:markap14@hotmail.com]
> Sent: Friday, September 11, 2015 6:39 PM
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified (FlowFileHandlingException)
>
> Rick,
> This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
> I.e., there was never a call to session.transfer() for that FlowFile.
> Thanks-Mark
>
>> From: rbraddy@softnas.com
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>> Date: Fri, 11 Sep 2015 23:25:33 +0000
>>
>> Some more details:
>>
>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3]
>> c.s.c.processors.files.GetFileData
>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to process
>> session due to
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim
>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1e
>> a005,offset=0,name=printargs.c,size=190] is not known in this session
>> (StandardProcessSession[id=6967]):
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim
>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1e
>> a005,offset=0,name=printargs.c,size=190] is not known in this session
>> (StandardProcessSession[id=6967])
>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3]
>> c.s.c.processors.files.GetFileData
>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added
>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,claim
>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff0c
>> ad6b,offset=0,name=anImage.png,size=16418] to flow
>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3]
>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
>>
>> So it seems there's some issue with each of the FlowFiles...
>>
>> -----Original Message-----
>> From: Rick Braddy [mailto:rbraddy@softnas.com]
>> Sent: Friday, September 11, 2015 6:00 PM
>> To: dev@nifi.apache.org
>> Subject: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Hi,
>>
>> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
>>
>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6]
>> c.s.c.processors.files.GetFileData
>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to
>> retrieve files due to {}
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,claim
>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@9396
>> 4c66,offset=244,name=225120878343804,size=42] transfer relationship
>> not specified
>>
>> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
>>
>> Any suggestions for troubleshooting?
>>
>> Rick
>>
>>
>

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Rick Braddy <rb...@softnas.com>.
So the "transfer relationship not specified" occurs down in the Provenance processing, where it checks to see if there are flowfile records associated with the session/relationship.

There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct number of calls to importFrom and transfer(), so this confirms that transfer() is called and did record the outbound flowfiles, yet when the provenance subsystem looks for these records it does not find them.

Not being intimate with the internals of the framework yet, not sure what would case this.

Rick

-----Original Message-----
From: Rick Braddy 
Sent: Friday, September 11, 2015 8:26 PM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

Mark,

The interesting thing is that session.transfer() is being called, as I have stepped through it in the debugger.  I'm only calling importFrom() for actual files (not directories), as shown below.  This is a modified version of GetFile processor.

Rick

       final ListIterator<File> itr = files.listIterator();
        FlowFile flowFile = null;
        try {
            while (itr.hasNext()) {
                final File file = itr.next();
                final Path filePath = file.toPath();
                final Path relativePath = filePath.relativize(filePath.getParent());
                String relativePathString = relativePath.toString() + "/";
                if (relativePathString.isEmpty()) {
                    relativePathString = "./";
                }
                final Path absPath = filePath.toAbsolutePath();
                final String absPathString = absPath.getParent().toString() + "/";

                final long importStart = System.nanoTime();
                String fileType = "directory";
                flowFile = session.create();
                if (file.isFile()){
                    fileType = "file";
                    flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
                }
                final long importNanos = System.nanoTime() - importStart;
                final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);

                flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file
                flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
                flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
                flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
                Map<String, String> attributes = getAttributesFromFile(filePath);
                if (attributes.size() > 0) {
                    flowFile = session.putAllAttributes(flowFile, attributes);
                }

                session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
                session.transfer(flowFile, REL_SUCCESS);
                logger.info("added {} to flow", new Object[]{flowFile});

                if (!isScheduled()) {  // if processor stopped, put the rest of the files back on the queue.
                    queueLock.lock();
                    try {
                        while (itr.hasNext()) {
                            final File nextFile = itr.next();
                            fileQueue.add(nextFile);
                            inProcess.remove(nextFile);
                        }
                    } finally {
                        queueLock.unlock();
                    }
                }
            }
            session.commit();
        } catch (final Exception e) {
            logger.error("Failed to transfer files due to {}", e);

-----Original Message-----
From: Mark Payne [mailto:markap14@hotmail.com]
Sent: Friday, September 11, 2015 6:39 PM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

Rick,
This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
I.e., there was never a call to session.transfer() for that FlowFile.
Thanks-Mark

> From: rbraddy@softnas.com
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified
> (FlowFileHandlingException)
> Date: Fri, 11 Sep 2015 23:25:33 +0000
> 
> Some more details:
> 
> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
> c.s.c.processors.files.GetFileData
> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to process 
> session due to
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim
> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1e
> a005,offset=0,name=printargs.c,size=190] is not known in this session
> (StandardProcessSession[id=6967]): 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim
> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1e
> a005,offset=0,name=printargs.c,size=190] is not known in this session
> (StandardProcessSession[id=6967])
> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> c.s.c.processors.files.GetFileData
> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,claim
> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff0c
> ad6b,offset=0,name=anImage.png,size=16418] to flow
> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow ...
> 
> So it seems there's some issue with each of the FlowFiles...
> 
> -----Original Message-----
> From: Rick Braddy [mailto:rbraddy@softnas.com]
> Sent: Friday, September 11, 2015 6:00 PM
> To: dev@nifi.apache.org
> Subject: Transfer relationship not specified
> (FlowFileHandlingException)
> 
> Hi,
> 
> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
> 
> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
> c.s.c.processors.files.GetFileData
> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
> retrieve files due to {}
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,claim
> =org.apache.nifi.controller.repository.claim.StandardContentClaim@9396
> 4c66,offset=244,name=225120878343804,size=42] transfer relationship 
> not specified
> 
> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
> 
> Any suggestions for troubleshooting?
> 
> Rick
> 
> 
 		 	   		  

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Mark Payne <ma...@hotmail.com>.
Rick,
This error message isn't indicating that there's no Connection for the Relationship, but rather than the FlowFile was never transferred.
I.e., there was never a call to session.transfer() for that FlowFile.
Thanks-Mark

> From: rbraddy@softnas.com
> To: dev@nifi.apache.org
> Subject: RE: Transfer relationship not specified (FlowFileHandlingException)
> Date: Fri, 11 Sep 2015 23:25:33 +0000
> 
> Some more details:
> 
> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to process session due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1ea005,offset=0,name=printargs.c,size=190] is not known in this session (StandardProcessSession[id=6967]): org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1ea005,offset=0,name=printargs.c,size=190] is not known in this session (StandardProcessSession[id=6967])
> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@ff0cad6b,offset=0,name=anImage.png,size=16418] to flow
> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow
> ...
> 
> So it seems there's some issue with each of the FlowFiles...
> 
> -----Original Message-----
> From: Rick Braddy [mailto:rbraddy@softnas.com] 
> Sent: Friday, September 11, 2015 6:00 PM
> To: dev@nifi.apache.org
> Subject: Transfer relationship not specified (FlowFileHandlingException)
> 
> Hi,
> 
> I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:
> 
> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] c.s.c.processors.files.GetFileData [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to retrieve files due to {}  org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@93964c66,offset=244,name=225120878343804,size=42] transfer relationship not specified
> 
> I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.
> 
> Any suggestions for troubleshooting?
> 
> Rick
> 
> 
 		 	   		  

RE: Transfer relationship not specified (FlowFileHandlingException)

Posted by Rick Braddy <rb...@softnas.com>.
Some more details:

2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to process session due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1ea005,offset=0,name=printargs.c,size=190] is not known in this session (StandardProcessSession[id=6967]): org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1ea005,offset=0,name=printargs.c,size=190] is not known in this session (StandardProcessSession[id=6967])
2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@ff0cad6b,offset=0,name=anImage.png,size=16418] to flow
2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] to flow
...

So it seems there's some issue with each of the FlowFiles...

-----Original Message-----
From: Rick Braddy [mailto:rbraddy@softnas.com] 
Sent: Friday, September 11, 2015 6:00 PM
To: dev@nifi.apache.org
Subject: Transfer relationship not specified (FlowFileHandlingException)

Hi,

I have a processor that appears to be creating FlowFiles correctly (modified a standard processor), but when it goes to commit() the session, an exception is raised:

2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] c.s.c.processors.files.GetFileData [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to retrieve files due to {}  org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,claim=org.apache.nifi.controller.repository.claim.StandardContentClaim@93964c66,offset=244,name=225120878343804,size=42] transfer relationship not specified

I'm assuming this is supposed to be indicating there's no connection available to commit the transfer; however, there is a "success" relationship registered during init() in same way as original processor did it, and the success relationship out is connected to another processor input as it should be.

Any suggestions for troubleshooting?

Rick