You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by idioma <co...@gmail.com> on 2016/04/13 15:14:04 UTC

catch commit error in OnTrigger to diversify session behaviour

Hi,
I have a custom Apache NiFi processor whose aim is to load a sample graph
into Titan Db. I have a load method that creates the graph with a few
vertices/edges and commits the transaction (transaction.commit() is wrapped
around a try/catch block to catch any TitanException). Inside my On Trigger
I have the following:

@Override
    public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {

        FlowFile flowFile = session.get();
        if (flowFile == null) return;

        session.read(flowFile, new InputStreamCallback() {

            @Override
            public void process(InputStream in) throws IOException {

                StringWriter strWriter = new StringWriter();
                IOUtils.copy(in, strWriter, "UTF-8");
                String contents = strWriter.toString();

                try {
                    load(contents);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
}

The load method takes a string, which is the conversion from the InpuStream.
I am trying to find a way to, according to the commit being successfully or
not, jump into a session.rollback(true); session.transfer(flowFile, FAILURE)
or session.transfer(flowFile, SUCCESS)

Can you help please?



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/catch-commit-error-in-OnTrigger-to-diversify-session-behaviour-tp9027.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: catch commit error in OnTrigger to diversify session behaviour

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Idioma

Keep an eye on this https://issues.apache.org/jira/browse/NIFI-1771 and consider using java.util.concurrent.atomic.AtomicReference

Cheers
Oleg

On Apr 14, 2016, at 7:01 AM, idioma <co...@gmail.com>> wrote:

Bryan,
thank you so much, this is absolutely fantastic. I was actually looking for
an easy way to access to the content of my load class and I did not know
about ObjectHolder.

Thank you so much



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/catch-commit-error-in-OnTrigger-to-diversify-session-behaviour-tp9027p9066.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com<http://nabble.com>.



Re: catch commit error in OnTrigger to diversify session behaviour

Posted by idioma <co...@gmail.com>.
Bryan,
thank you so much, this is absolutely fantastic. I was actually looking for
an easy way to access to the content of my load class and I did not know
about ObjectHolder. 

Thank you so much



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/catch-commit-error-in-OnTrigger-to-diversify-session-behaviour-tp9027p9066.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: catch commit error in OnTrigger to diversify session behaviour

Posted by Russell Bateman <ru...@perfectsearchcorp.com>.
Oleg,

Agreed. As I started only a few months ago, I have been using 
AtomicReference and it has been reliable and satisfied all my needs. 
(Just sayin'.)

Best

On 04/14/2016 05:22 AM, Oleg Zhurakousky wrote:
> A bit unrelated, but how do you guys feel if we deprecate ObjectHolder so it could be gone by 1.0?
> AtomicReference is available from Java 5
>
> Cheers
> Oleg
>
>> On Apr 14, 2016, at 5:18 AM, Bryan Bende <bb...@gmail.com> wrote:
>>
>> Hello,
>>
>> It may be easier to move the load() out of the InputStreamCallback. You
>> could do something like this...
>>
>> final ObjectHolder<String> holder = new ObjectHolder<String>(null);
>>
>> session.read(flowFile, new InputStreamCallback() {
>>
>>             @Override
>>             public void process(InputStream in) throws IOException {
>>                 StringWriter strWriter = new StringWriter();
>>                 IOUtils.copy(in, strWriter, "UTF-8");
>>                 String contents = strWriter.toString();
>>                 holder.set(contents);
>>             }
>> });
>>
>> try {
>>         load(holder.get());
>>         session.transfer(flowFile, SUCCESS);
>>       } catch (IOException e) {
>>         session.transfer(flowFile, FAILURE);
>> }
>>
>>
>> -Bryan
>>
>> On Thu, Apr 14, 2016 at 9:06 AM, idioma <co...@gmail.com> wrote:
>>
>>> Hi,
>>> I have modified my onTrigger in this way:
>>>
>>> session.read(flowFile, new InputStreamCallback() {
>>>
>>>             @Override
>>>             public void process(InputStream in) throws IOException {
>>>
>>>                 StringWriter strWriter = new StringWriter();
>>>                 IOUtils.copy(in, strWriter, "UTF-8");
>>>                 String contents = strWriter.toString();
>>>
>>>                 try {
>>>                     load(contents);
>>>                 } catch (IOException e) {
>>>                     e.getMessage();
>>>                     boolean error = true;
>>>                     throw e;
>>>                 }
>>>             }
>>>         });
>>>
>>> What I am struggling with is how to send it to a failure or a success
>>> depending on the error being thrown. Any help would be appreciated, thank
>>> you so much.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-nifi-developer-list.39713.n7.nabble.com/catch-commit-error-in-OnTrigger-to-diversify-session-behaviour-tp9027p9062.html
>>> Sent from the Apache NiFi Developer List mailing list archive at
>>> Nabble.com.
>>>


Re: catch commit error in OnTrigger to diversify session behaviour

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
A bit unrelated, but how do you guys feel if we deprecate ObjectHolder so it could be gone by 1.0?
AtomicReference is available from Java 5

Cheers
Oleg

> On Apr 14, 2016, at 5:18 AM, Bryan Bende <bb...@gmail.com> wrote:
> 
> Hello,
> 
> It may be easier to move the load() out of the InputStreamCallback. You
> could do something like this...
> 
> final ObjectHolder<String> holder = new ObjectHolder<String>(null);
> 
> session.read(flowFile, new InputStreamCallback() {
> 
>            @Override
>            public void process(InputStream in) throws IOException {
>                StringWriter strWriter = new StringWriter();
>                IOUtils.copy(in, strWriter, "UTF-8");
>                String contents = strWriter.toString();
>                holder.set(contents);
>            }
> });
> 
> try {
>        load(holder.get());
>        session.transfer(flowFile, SUCCESS);
>      } catch (IOException e) {
>        session.transfer(flowFile, FAILURE);
> }
> 
> 
> -Bryan
> 
> On Thu, Apr 14, 2016 at 9:06 AM, idioma <co...@gmail.com> wrote:
> 
>> Hi,
>> I have modified my onTrigger in this way:
>> 
>> session.read(flowFile, new InputStreamCallback() {
>> 
>>            @Override
>>            public void process(InputStream in) throws IOException {
>> 
>>                StringWriter strWriter = new StringWriter();
>>                IOUtils.copy(in, strWriter, "UTF-8");
>>                String contents = strWriter.toString();
>> 
>>                try {
>>                    load(contents);
>>                } catch (IOException e) {
>>                    e.getMessage();
>>                    boolean error = true;
>>                    throw e;
>>                }
>>            }
>>        });
>> 
>> What I am struggling with is how to send it to a failure or a success
>> depending on the error being thrown. Any help would be appreciated, thank
>> you so much.
>> 
>> 
>> 
>> --
>> View this message in context:
>> http://apache-nifi-developer-list.39713.n7.nabble.com/catch-commit-error-in-OnTrigger-to-diversify-session-behaviour-tp9027p9062.html
>> Sent from the Apache NiFi Developer List mailing list archive at
>> Nabble.com.
>> 


Re: catch commit error in OnTrigger to diversify session behaviour

Posted by Bryan Bende <bb...@gmail.com>.
Hello,

It may be easier to move the load() out of the InputStreamCallback. You
could do something like this...

final ObjectHolder<String> holder = new ObjectHolder<String>(null);

 session.read(flowFile, new InputStreamCallback() {

            @Override
            public void process(InputStream in) throws IOException {
                StringWriter strWriter = new StringWriter();
                IOUtils.copy(in, strWriter, "UTF-8");
                String contents = strWriter.toString();
                holder.set(contents);
            }
});

try {
        load(holder.get());
        session.transfer(flowFile, SUCCESS);
      } catch (IOException e) {
        session.transfer(flowFile, FAILURE);
}


-Bryan

On Thu, Apr 14, 2016 at 9:06 AM, idioma <co...@gmail.com> wrote:

> Hi,
> I have modified my onTrigger in this way:
>
>  session.read(flowFile, new InputStreamCallback() {
>
>             @Override
>             public void process(InputStream in) throws IOException {
>
>                 StringWriter strWriter = new StringWriter();
>                 IOUtils.copy(in, strWriter, "UTF-8");
>                 String contents = strWriter.toString();
>
>                 try {
>                     load(contents);
>                 } catch (IOException e) {
>                     e.getMessage();
>                     boolean error = true;
>                     throw e;
>                 }
>             }
>         });
>
> What I am struggling with is how to send it to a failure or a success
> depending on the error being thrown. Any help would be appreciated, thank
> you so much.
>
>
>
> --
> View this message in context:
> http://apache-nifi-developer-list.39713.n7.nabble.com/catch-commit-error-in-OnTrigger-to-diversify-session-behaviour-tp9027p9062.html
> Sent from the Apache NiFi Developer List mailing list archive at
> Nabble.com.
>

Re: catch commit error in OnTrigger to diversify session behaviour

Posted by idioma <co...@gmail.com>.
Hi,
I have modified my onTrigger in this way:

 session.read(flowFile, new InputStreamCallback() {

            @Override
            public void process(InputStream in) throws IOException {

                StringWriter strWriter = new StringWriter();
                IOUtils.copy(in, strWriter, "UTF-8");
                String contents = strWriter.toString();

                try {
                    load(contents);
                } catch (IOException e) {
                    e.getMessage();
                    boolean error = true;
                    throw e;
                }
            }
        });

What I am struggling with is how to send it to a failure or a success
depending on the error being thrown. Any help would be appreciated, thank
you so much. 



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/catch-commit-error-in-OnTrigger-to-diversify-session-behaviour-tp9027p9062.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: catch commit error in OnTrigger to diversify session behaviour

Posted by idioma <co...@gmail.com>.
Matt, 
thanks for your reply, but I am not sure I have actually understood what you
mean. In my load method I have the following:

try{
    transaction.commit();
}catch (TitanException ex) {

       System.out.println("This is a failure message");
       transaction.rollback();
}

How are you suggesting to re-throw the exception in the OnTrigger in order
to transfer it to a failure, or in case there is no exception to a success?
Can you please provide an example? 

Thanks,

       




--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/catch-commit-error-in-OnTrigger-to-diversify-session-behaviour-tp9027p9060.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: catch commit error in OnTrigger to diversify session behaviour

Posted by Matt Burgess <ma...@gmail.com>.
Upon a TitanException in your load() method, you could wrap it in an IOException and re-throw, and rather than print the stack trace, you can catch the IOException in onTrigger() and send to failure. You likely won't want to rollback the session if you're transferring to failure; the user can choose to route the failure relationship however they want (perhaps back to the processor). However you probably want to penalize the flow file since it causes the error.

If the TitanException can be caused by network issues or something not related to the flow file content itself, you may want to throw various types of Exceptions and route the those flow files who are impacted by non-flowfile related errors) to a "retry" relationship, indicating that the same operation may work at a later time.

Regards,
Matt


> On Apr 13, 2016, at 6:14 AM, idioma <co...@gmail.com> wrote:
> 
> Hi,
> I have a custom Apache NiFi processor whose aim is to load a sample graph
> into Titan Db. I have a load method that creates the graph with a few
> vertices/edges and commits the transaction (transaction.commit() is wrapped
> around a try/catch block to catch any TitanException). Inside my On Trigger
> I have the following:
> 
> @Override
>    public void onTrigger(final ProcessContext context, final ProcessSession
> session) throws ProcessException {
> 
>        FlowFile flowFile = session.get();
>        if (flowFile == null) return;
> 
>        session.read(flowFile, new InputStreamCallback() {
> 
>            @Override
>            public void process(InputStream in) throws IOException {
> 
>                StringWriter strWriter = new StringWriter();
>                IOUtils.copy(in, strWriter, "UTF-8");
>                String contents = strWriter.toString();
> 
>                try {
>                    load(contents);
>                } catch (IOException e) {
>                    e.printStackTrace();
>                }
>            }
>        });
> }
> 
> The load method takes a string, which is the conversion from the InpuStream.
> I am trying to find a way to, according to the commit being successfully or
> not, jump into a session.rollback(true); session.transfer(flowFile, FAILURE)
> or session.transfer(flowFile, SUCCESS)
> 
> Can you help please?
> 
> 
> 
> --
> View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/catch-commit-error-in-OnTrigger-to-diversify-session-behaviour-tp9027.html
> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.