You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Russell Bateman <ru...@perfectsearchcorp.com> on 2016/03/23 22:33:14 UTC

Can't add an attribute inside session read call-back

I stumbled upon something peculiar in NiFi. I had been attaching an 
attribute to a flowfile in the session.read() call-back. The NiFi unit 
testing framework tolerated this, but when I finally ran my processor 
for real, it blew chunks (IllegalStateException). I solved the problem 
by saving the new attribute and attaching it instead outside the 
call-back just before calling session.transfer().

The only reason I'm pointing this out is because I somewhat rely on unit 
testing and hope to catch this level of error/gotcha earlier than 
button-pushing testing

--in case anyone cares.

Open to comments, cat-calls, etc.

;-)

Best regards,

Russ

Re: Can't add an attribute inside session read call-back

Posted by Russell Bateman <ru...@perfectsearchcorp.com>.
Hope not to make copy/paste mistakes. I can share the code snippets, but 
unfortunately not a trace. I didn't keep one and my efforts to restore 
the broken code only leave me frustrated. (I've obliterated my NAR 
several times, but the debugger tells me that the lines don't match up 
when I step through, can't set breakpoints, etc. Sorry.)

How I was doing it:

   public void onTrigger( final ProcessSession session, final 
DataExtractor filter, final String regexPattern )
       throws ProcessException
   {
     final FlowFile flowfile = session.get();
*    final AtomicReference< FlowFile > flowFileHolder = new 
AtomicReference<>();**
**    flowFileHolder.set( flowfile );*

     session.read( flowfile, new InputStreamCallback()
     {
       @Override
       public void process( InputStream in ) throws IOException
       {
         RegularExpressionMatch match;

         try
         {
           // slurp the in-coming stream to match into...
           Matcher matcher = Pattern.compile( regexPattern ).matcher( 
StreamUtilities.slurp( in, -1 ) );
           if( !matcher.find() )
             throw new IOException( "Failed to match regular expression 
pattern in the document" );
           match = new RegularExpressionMatch( matcher );
         }
         catch( Exception e )
         {
           throw new IOException( "Failed to create regular expression 
matcher with passed properties" );
         }

         Tag    tag  = filter.buildTag( match );
         String xml;
         xml = ( Utilities.isEmpty( tag ) )
                   ? "(No tag built for this document.)"
                   : new TagUtilities( tag, true /*properties.debug*/ 
).getXmlFromTag( 0 );
*       try**
**       {**
**         FlowFile flowFileWithAttributes = flowFileHolder.get();**
**         flowFileWithAttributes = session.putAttribute( 
flowFileWithAttributes, "concept", xml );**
**         flowFileHolder.set( flowFileWithAttributes );**
        }
        catch( Throwable e )
        {
----->   e.printStackTrace();      // IllegalStateException
        }
*      }
     } );

     // the flowfile now has attributes we put on it...
     session.transfer( flowFileHolder.get(), new 
ProcessorRelationships().getSuccess() );
   }

How I'm doing it now:

    public void onTrigger( final ProcessSession session, final
    DataExtractor filter, final String regexPattern )
         throws ProcessException
    {
       FlowFile flowfile = session.get();
    *  final String                    attribute;*
    *  final AtomicReference< String > attributeHolder = new
    AtomicReference<>();*

       session.read( flowfile, new InputStreamCallback()
       {
         @Override
         public void process( InputStream in ) throws IOException
         {
           RegularExpressionMatch match;

           try
           {
             // slurp the in-coming stream to match into...
             String incoming = StreamUtilities.slurp( in, -1 );
             Matcher matcher = Pattern.compile( regexPattern ).matcher(
    incoming );
             if( !matcher.find() )
               throw new IOException( "Failed to match regular
    expression pattern in the document" );
             match = new RegularExpressionMatch( matcher );
           }
           catch( Exception e )
           {
             throw new IOException( "Failed to create regular expression
    matcher with passed properties" );
           }

           Tag    tag  = filter.buildTag( match );
           String xml;
           xml = ( Utilities.isEmpty( tag ) )
                     ? "(No tag built for this document.)"
                     : new TagUtilities( tag, true /*properties.debug*/
    ).getXmlFromTag( 0 );
    *      attributeHolder.set( xml );*
         }
       } );

       /* Can't change anything about the session right inside here:
    don't change flowfiles
        * in the callback! Set the attribute here instead of inside the
    read!
        */
    *  attribute = attributeHolder.get();*
    *  flowfile  = session.putAttribute( flowfile, "concept", attribute );*

       // the flowfile now has attributes we put on it...
       session.transfer( flowfile, new
    ProcessorRelationships().getSuccess() );
    }

On 03/23/2016 03:42 PM, Oleg Zhurakousky wrote:
> Russell
>
> This doesn’t sound right. Would you care to share a code snippet on how you set the attribute as well as stack trace?
>
> Cheers
> Oleg
>
>> On Mar 23, 2016, at 5:33 PM, Russell Bateman <ru...@perfectsearchcorp.com> wrote:
>>
>> I stumbled upon something peculiar in NiFi. I had been attaching an attribute to a flowfile in the session.read() call-back. The NiFi unit testing framework tolerated this, but when I finally ran my processor for real, it blew chunks (IllegalStateException). I solved the problem by saving the new attribute and attaching it instead outside the call-back just before calling session.transfer().
>>
>> The only reason I'm pointing this out is because I somewhat rely on unit testing and hope to catch this level of error/gotcha earlier than button-pushing testing
>>
>> --in case anyone cares.
>>
>> Open to comments, cat-calls, etc.
>>
>> ;-)
>>
>> Best regards,
>>
>> Russ

Re: Can't add an attribute inside session read call-back

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Russell 

This doesn’t sound right. Would you care to share a code snippet on how you set the attribute as well as stack trace?

Cheers
Oleg

> On Mar 23, 2016, at 5:33 PM, Russell Bateman <ru...@perfectsearchcorp.com> wrote:
> 
> I stumbled upon something peculiar in NiFi. I had been attaching an attribute to a flowfile in the session.read() call-back. The NiFi unit testing framework tolerated this, but when I finally ran my processor for real, it blew chunks (IllegalStateException). I solved the problem by saving the new attribute and attaching it instead outside the call-back just before calling session.transfer().
> 
> The only reason I'm pointing this out is because I somewhat rely on unit testing and hope to catch this level of error/gotcha earlier than button-pushing testing
> 
> --in case anyone cares.
> 
> Open to comments, cat-calls, etc.
> 
> ;-)
> 
> Best regards,
> 
> Russ
>