You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Sumanth Chinthagunta <xm...@gmail.com> on 2015/10/01 02:43:56 UTC

Re: need help with Async code

thanks for clarifying on getControllerServiceIdentifiers API.
I have an other question:
If I have a processor that is designed to have on side effect on FlowFile, what is the best/clean way to read content of the flowfile?
e.g., my processor’s only job is to log  content of FlowFile, is there a method like FlowFile.getContentAsString()? or do I have to do what I am doing here ? 
https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PublishEventBus.java#L102 <https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PublishEventBus.java#L102>

Thanks
Sumo 

> On Sep 30, 2015, at 2:52 PM, Aldrin Piri <al...@gmail.com> wrote:
> 
> Sumo,
> 
> I did some digging around on your Github Repo and see that you've migrated
> your ControllerService lookup to your @OnScheduled method, making use of
> the ProcessContext.  This approach is certainly more preferred in terms of
> allowing configuration of the Processor than the prior method you outlined
> above.  That identifier is the unique ID generated by the framework for a
> new controller service.  However, to close the trail on the previous path,
> from within init, you would have needed to do something along the lines of:
> 
> context.getControllerServiceLookup().getControllerServiceIdentifiers(
> VertxServiceInterface.class)
> 
> to find all the instances available and then choose one of those
> identifiers, if any were present, at the time the processor was initialized.
> 
> To your second question, I believe you are on the right track, although I
> am not overly familiar with Vertx.  This seems to map quite closely with
> the JMS family of processors (GetJMSTopic [1] in conjunction with its
> abstract parent JmsConsumer [2]). If you find you need more granular
> control of the session, you can create a Processor that extends
> AbstractSessionFactoryProcessor instead of AbstractProcessor.
> 
> Feel free to follow up with any additional questions or details you may
> have.
> 
> Thanks!
> 
> [1]
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java>
> [2]
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java>
> 
> On Sun, Sep 27, 2015 at 3:24 PM, Sumanth Chinthagunta <xmlking@gmail.com <ma...@gmail.com>>
> wrote:
> 
>> 
>> Hi All,
>> I am new to NiFi and   I'm stuck with couple issues:
>> 
>> 1. Unable to get hold of ControllerService  from Processor’s init method.
>>        I wanted to pre-set some dependencies during init phase instead
>> of  querying them repeatedly in onTrigger method.
>>        I am getting null for service and not sure what I have to pass for
>> 'serviceIdentifier’ . I couldn't find documentation or examples on how to
>> give Identifier to a service.
>> 
>> 
>>        final VertxServiceInterface vertxService = (VertxServiceInterface)
>> context.getControllerServiceLookup().getControllerService("VertxService”)
>> 
>> 
>> https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55
>> <
>> https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55 <https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55>
>>> 
>> 
>> 2. for my usecase I get data published to a topic from EventBus with
>> following code.
>> 
>>        EventBus eb = vertx.eventBus();
>> 
>>        eb.consumer("news.uk.sport", message -> {
>>                System.out.println("I have received a message: " +
>> message.body());
>>        });
>> 
>>        I am working on a date ingest processor (push based) that needs to
>> listen for new messages on a topic and send to flow as FlowFile.
>>        In my case data source is EvenBus that expose emit messages via
>> callback API.
>>        I am looking for ideas on how to call Processor’s onTrigger method
>> when the above callback is evoked.
>>        Should I have to use my own intermediate queue and poll it in
>> onTrigger method?
>>        is there a better way to trigger the  onTrigger method
>> programmatically ?
>> 
>> Thanks
>> Sumo


Re: need help with Async code

Posted by Aldrin Piri <al...@gmail.com>.
Sumo,

The callback approach using session.read is the best path.  Ultimately, the
framework provides streams to allow handling data of all sizes to avoid
putting too much into memory.  Accordingly, we do not have any convenience
methods that put content into a given format.

On Wed, Sep 30, 2015 at 8:43 PM, Sumanth Chinthagunta <xm...@gmail.com>
wrote:

> thanks for clarifying on getControllerServiceIdentifiers API.
> I have an other question:
> If I have a processor that is designed to have on side effect on FlowFile,
> what is the best/clean way to read content of the flowfile?
> e.g., my processor’s only job is to log  content of FlowFile, is there a
> method like FlowFile.getContentAsString()? or do I have to do what I am
> doing here ?
>
> https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PublishEventBus.java#L102
> <
> https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PublishEventBus.java#L102
> >
>
> Thanks
> Sumo
>
> > On Sep 30, 2015, at 2:52 PM, Aldrin Piri <al...@gmail.com> wrote:
> >
> > Sumo,
> >
> > I did some digging around on your Github Repo and see that you've
> migrated
> > your ControllerService lookup to your @OnScheduled method, making use of
> > the ProcessContext.  This approach is certainly more preferred in terms
> of
> > allowing configuration of the Processor than the prior method you
> outlined
> > above.  That identifier is the unique ID generated by the framework for a
> > new controller service.  However, to close the trail on the previous
> path,
> > from within init, you would have needed to do something along the lines
> of:
> >
> > context.getControllerServiceLookup().getControllerServiceIdentifiers(
> > VertxServiceInterface.class)
> >
> > to find all the instances available and then choose one of those
> > identifiers, if any were present, at the time the processor was
> initialized.
> >
> > To your second question, I believe you are on the right track, although I
> > am not overly familiar with Vertx.  This seems to map quite closely with
> > the JMS family of processors (GetJMSTopic [1] in conjunction with its
> > abstract parent JmsConsumer [2]). If you find you need more granular
> > control of the session, you can create a Processor that extends
> > AbstractSessionFactoryProcessor instead of AbstractProcessor.
> >
> > Feel free to follow up with any additional questions or details you may
> > have.
> >
> > Thanks!
> >
> > [1]
> >
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
> <
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java
> >
> > [2]
> >
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
> <
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
> >
> >
> > On Sun, Sep 27, 2015 at 3:24 PM, Sumanth Chinthagunta <xmlking@gmail.com
> <ma...@gmail.com>>
> > wrote:
> >
> >>
> >> Hi All,
> >> I am new to NiFi and   I'm stuck with couple issues:
> >>
> >> 1. Unable to get hold of ControllerService  from Processor’s init
> method.
> >>        I wanted to pre-set some dependencies during init phase instead
> >> of  querying them repeatedly in onTrigger method.
> >>        I am getting null for service and not sure what I have to pass
> for
> >> 'serviceIdentifier’ . I couldn't find documentation or examples on how
> to
> >> give Identifier to a service.
> >>
> >>
> >>        final VertxServiceInterface vertxService =
> (VertxServiceInterface)
> >>
> context.getControllerServiceLookup().getControllerService("VertxService”)
> >>
> >>
> >>
> https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55
> >> <
> >>
> https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55
> <
> https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55
> >
> >>>
> >>
> >> 2. for my usecase I get data published to a topic from EventBus with
> >> following code.
> >>
> >>        EventBus eb = vertx.eventBus();
> >>
> >>        eb.consumer("news.uk.sport", message -> {
> >>                System.out.println("I have received a message: " +
> >> message.body());
> >>        });
> >>
> >>        I am working on a date ingest processor (push based) that needs
> to
> >> listen for new messages on a topic and send to flow as FlowFile.
> >>        In my case data source is EvenBus that expose emit messages via
> >> callback API.
> >>        I am looking for ideas on how to call Processor’s onTrigger
> method
> >> when the above callback is evoked.
> >>        Should I have to use my own intermediate queue and poll it in
> >> onTrigger method?
> >>        is there a better way to trigger the  onTrigger method
> >> programmatically ?
> >>
> >> Thanks
> >> Sumo
>
>