You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Sumanth Chinthagunta <xm...@gmail.com> on 2015/09/27 21:53:09 UTC

nifi question

Hi All,
I am new to NiFi and   I'm stuck with couple issues:  

1. Unable to get hold of ControllerService  from Processor’s init method. 
	I wanted to pre-set some dependencies in my Processor during init phase instead of  querying them repeatedly in onTrigger method. 
	I am getting null for service and not sure what I have to pass for 'serviceIdentifier’ . I couldn't find documentation or examples on how to give Identifier to a service.  


	final VertxServiceInterface vertxService = (VertxServiceInterface) context.getControllerServiceLookup().getControllerService("VertxService”)

	https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55 <https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55>

2. for my usecase I get data published to a topic from EventBus with following code. 

	EventBus eb = vertx.eventBus();

	eb.consumer("news.uk.sport", message -> {
 	 	System.out.println("I have received a message: " + message.body());
	});
	
	I am working on a date ingest processor (push based) that needs to listen for new messages on a topic and send them to flow as FlowFile. 
	In my case data source is EvenBus that expose emit messages via callback API. 
	I am looking for ideas on how to call Processor’s onTrigger method when the above callback is evoked. 
	Should I have to use my own intermediate queue and poll it in onTrigger method? 
	is there a better way to trigger the  onTrigger method programmatically ? 

Thanks 
Sumo

Re: nifi question

Posted by Sumanth Chinthagunta <xm...@gmail.com>.
Great. that is exactly what I needed.

> On Sep 28, 2015, at 2:42 PM, Mark Payne <ma...@hotmail.com> wrote:
> 
> Sumo,
> 
> Generally, the approach that we take is to perform that type of operation in a method that has the @OnScheduled annotation.
> 
> Then you do it only once when the Processor is scheduled.
> 
> If you want to ensure that it happens only one time for the lifecycle of the JVM, you could use a boolean to keep track of whether or
> not the action has been performed. For example:
> 
> private volatile boolean expensiveActionPerformed = false;
> 
> @OnScheduled
> public void doExpensiveSetup(final ProcessContext context) {
>    if (!expensiveActionPerformed) {
> 
>        // do expensive action
> 
>        expensiveActionPerformed = true;
>    }
> }
> 
> 
> Thanks
> -Mark
> 
> 
>> On Sep 28, 2015, at 5:38 PM, Sumanth Chinthagunta <xm...@gmail.com> wrote:
>> 
>> Thanks Mark for pointing me to GetTwitter example. I will try to follow this pattern. 
>> PropertyDescriptor is only available in onTrigger method via ProcessContext. I needed to get hold of Service from  initialize method using ProcessorInitializationContexts as documented in the Developer guide. this helps me to do expensive Service calls once not doing every time onTrigger method is invoked. 
>> 
>> Thanks 
>> Sumo
>> 
>> 
>>> On Sep 28, 2015, at 5:32 AM, Mark Payne <ma...@hotmail.com> wrote:
>>> 
>>> Sumo,
>>> 
>>> The preferred mechanism for obtaining a Controller Service is to do so via a PropertyDescriptor. You would
>>> specify that the property represents a Controller Service by using the identifiesControllerService(Class) method.
>>> This is discussed in the "Interacting with a Controller Service" section of the Developer's Guide [1].
>>> 
>>> In terms of communicating with some asynchronous process, generally the best solution is to use a queue and
>>> then read from that queue in the onTrigger method. You will not be able to call the onTrigger method properly
>>> yourself, as it is designed to be called by the framework at the appropriate time. I would recommend using
>>> a BlockingQueue with a small capacity, as you do not want your Java Heap to fill up with objects from this
>>> Processor if the Processor is stopped for a while. You can look at how this is done in the GetTwitter processor,
>>> if you would like to have an example to look at.
>>> 
>>> Let us know if you have any more questions, or if anything is still not clear!
>>> 
>>> Thanks
>>> -Mark
>>> 
>>> 
>>> [1] http://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#interacting-with-controller-service
>>> 
>>> 
>>> 
>>>> On Sep 27, 2015, at 3:53 PM, Sumanth Chinthagunta <xm...@gmail.com> wrote:
>>>> 
>>>> 
>>>> Hi All,
>>>> I am new to NiFi and   I'm stuck with couple issues:  
>>>> 
>>>> 1. Unable to get hold of ControllerService  from Processor’s init method. 
>>>> 	I wanted to pre-set some dependencies in my Processor during init phase instead of  querying them repeatedly in onTrigger method. 
>>>> 	I am getting null for service and not sure what I have to pass for 'serviceIdentifier’ . I couldn't find documentation or examples on how to give Identifier to a service.  
>>>> 
>>>> 
>>>> 	final VertxServiceInterface vertxService = (VertxServiceInterface) context.getControllerServiceLookup().getControllerService("VertxService”)
>>>> 
>>>> 	https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55 <https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55>
>>>> 
>>>> 2. for my usecase I get data published to a topic from EventBus with following code. 
>>>> 
>>>> 	EventBus eb = vertx.eventBus();
>>>> 
>>>> 	eb.consumer("news.uk.sport", message -> {
>>>> 	 	System.out.println("I have received a message: " + message.body());
>>>> 	});
>>>> 	
>>>> 	I am working on a date ingest processor (push based) that needs to listen for new messages on a topic and send them to flow as FlowFile. 
>>>> 	In my case data source is EvenBus that expose emit messages via callback API. 
>>>> 	I am looking for ideas on how to call Processor’s onTrigger method when the above callback is evoked. 
>>>> 	Should I have to use my own intermediate queue and poll it in onTrigger method? 
>>>> 	is there a better way to trigger the  onTrigger method programmatically ? 
>>>> 
>>>> Thanks 
>>>> Sumo
>>> 
>> 
> 


Re: nifi question

Posted by Mark Payne <ma...@hotmail.com>.
Sumo,

Generally, the approach that we take is to perform that type of operation in a method that has the @OnScheduled annotation.

Then you do it only once when the Processor is scheduled.

If you want to ensure that it happens only one time for the lifecycle of the JVM, you could use a boolean to keep track of whether or
not the action has been performed. For example:

private volatile boolean expensiveActionPerformed = false;

@OnScheduled
public void doExpensiveSetup(final ProcessContext context) {
    if (!expensiveActionPerformed) {

        // do expensive action

        expensiveActionPerformed = true;
    }
}


Thanks
-Mark


> On Sep 28, 2015, at 5:38 PM, Sumanth Chinthagunta <xm...@gmail.com> wrote:
> 
> Thanks Mark for pointing me to GetTwitter example. I will try to follow this pattern. 
> PropertyDescriptor is only available in onTrigger method via ProcessContext. I needed to get hold of Service from  initialize method using ProcessorInitializationContexts as documented in the Developer guide. this helps me to do expensive Service calls once not doing every time onTrigger method is invoked. 
> 
> Thanks 
> Sumo
> 
> 
>> On Sep 28, 2015, at 5:32 AM, Mark Payne <ma...@hotmail.com> wrote:
>> 
>> Sumo,
>> 
>> The preferred mechanism for obtaining a Controller Service is to do so via a PropertyDescriptor. You would
>> specify that the property represents a Controller Service by using the identifiesControllerService(Class) method.
>> This is discussed in the "Interacting with a Controller Service" section of the Developer's Guide [1].
>> 
>> In terms of communicating with some asynchronous process, generally the best solution is to use a queue and
>> then read from that queue in the onTrigger method. You will not be able to call the onTrigger method properly
>> yourself, as it is designed to be called by the framework at the appropriate time. I would recommend using
>> a BlockingQueue with a small capacity, as you do not want your Java Heap to fill up with objects from this
>> Processor if the Processor is stopped for a while. You can look at how this is done in the GetTwitter processor,
>> if you would like to have an example to look at.
>> 
>> Let us know if you have any more questions, or if anything is still not clear!
>> 
>> Thanks
>> -Mark
>> 
>> 
>> [1] http://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#interacting-with-controller-service
>> 
>> 
>> 
>>> On Sep 27, 2015, at 3:53 PM, Sumanth Chinthagunta <xm...@gmail.com> wrote:
>>> 
>>> 
>>> Hi All,
>>> I am new to NiFi and   I'm stuck with couple issues:  
>>> 
>>> 1. Unable to get hold of ControllerService  from Processor’s init method. 
>>> 	I wanted to pre-set some dependencies in my Processor during init phase instead of  querying them repeatedly in onTrigger method. 
>>> 	I am getting null for service and not sure what I have to pass for 'serviceIdentifier’ . I couldn't find documentation or examples on how to give Identifier to a service.  
>>> 
>>> 
>>> 	final VertxServiceInterface vertxService = (VertxServiceInterface) context.getControllerServiceLookup().getControllerService("VertxService”)
>>> 
>>> 	https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55 <https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55>
>>> 
>>> 2. for my usecase I get data published to a topic from EventBus with following code. 
>>> 
>>> 	EventBus eb = vertx.eventBus();
>>> 
>>> 	eb.consumer("news.uk.sport", message -> {
>>> 	 	System.out.println("I have received a message: " + message.body());
>>> 	});
>>> 	
>>> 	I am working on a date ingest processor (push based) that needs to listen for new messages on a topic and send them to flow as FlowFile. 
>>> 	In my case data source is EvenBus that expose emit messages via callback API. 
>>> 	I am looking for ideas on how to call Processor’s onTrigger method when the above callback is evoked. 
>>> 	Should I have to use my own intermediate queue and poll it in onTrigger method? 
>>> 	is there a better way to trigger the  onTrigger method programmatically ? 
>>> 
>>> Thanks 
>>> Sumo
>> 
> 


Re: nifi question

Posted by Sumanth Chinthagunta <xm...@gmail.com>.
Thanks Mark for pointing me to GetTwitter example. I will try to follow this pattern. 
PropertyDescriptor is only available in onTrigger method via ProcessContext. I needed to get hold of Service from  initialize method using ProcessorInitializationContexts as documented in the Developer guide. this helps me to do expensive Service calls once not doing every time onTrigger method is invoked. 

Thanks 
Sumo
 

> On Sep 28, 2015, at 5:32 AM, Mark Payne <ma...@hotmail.com> wrote:
> 
> Sumo,
> 
> The preferred mechanism for obtaining a Controller Service is to do so via a PropertyDescriptor. You would
> specify that the property represents a Controller Service by using the identifiesControllerService(Class) method.
> This is discussed in the "Interacting with a Controller Service" section of the Developer's Guide [1].
> 
> In terms of communicating with some asynchronous process, generally the best solution is to use a queue and
> then read from that queue in the onTrigger method. You will not be able to call the onTrigger method properly
> yourself, as it is designed to be called by the framework at the appropriate time. I would recommend using
> a BlockingQueue with a small capacity, as you do not want your Java Heap to fill up with objects from this
> Processor if the Processor is stopped for a while. You can look at how this is done in the GetTwitter processor,
> if you would like to have an example to look at.
> 
> Let us know if you have any more questions, or if anything is still not clear!
> 
> Thanks
> -Mark
> 
> 
> [1] http://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#interacting-with-controller-service
> 
> 
> 
>> On Sep 27, 2015, at 3:53 PM, Sumanth Chinthagunta <xm...@gmail.com> wrote:
>> 
>> 
>> Hi All,
>> I am new to NiFi and   I'm stuck with couple issues:  
>> 
>> 1. Unable to get hold of ControllerService  from Processor’s init method. 
>> 	I wanted to pre-set some dependencies in my Processor during init phase instead of  querying them repeatedly in onTrigger method. 
>> 	I am getting null for service and not sure what I have to pass for 'serviceIdentifier’ . I couldn't find documentation or examples on how to give Identifier to a service.  
>> 
>> 
>> 	final VertxServiceInterface vertxService = (VertxServiceInterface) context.getControllerServiceLookup().getControllerService("VertxService”)
>> 
>> 	https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55 <https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55>
>> 
>> 2. for my usecase I get data published to a topic from EventBus with following code. 
>> 
>> 	EventBus eb = vertx.eventBus();
>> 
>> 	eb.consumer("news.uk.sport", message -> {
>> 	 	System.out.println("I have received a message: " + message.body());
>> 	});
>> 	
>> 	I am working on a date ingest processor (push based) that needs to listen for new messages on a topic and send them to flow as FlowFile. 
>> 	In my case data source is EvenBus that expose emit messages via callback API. 
>> 	I am looking for ideas on how to call Processor’s onTrigger method when the above callback is evoked. 
>> 	Should I have to use my own intermediate queue and poll it in onTrigger method? 
>> 	is there a better way to trigger the  onTrigger method programmatically ? 
>> 
>> Thanks 
>> Sumo
> 


Re: nifi question

Posted by Mark Payne <ma...@hotmail.com>.
Sumo,

The preferred mechanism for obtaining a Controller Service is to do so via a PropertyDescriptor. You would
specify that the property represents a Controller Service by using the identifiesControllerService(Class) method.
This is discussed in the "Interacting with a Controller Service" section of the Developer's Guide [1].

In terms of communicating with some asynchronous process, generally the best solution is to use a queue and
then read from that queue in the onTrigger method. You will not be able to call the onTrigger method properly
yourself, as it is designed to be called by the framework at the appropriate time. I would recommend using
a BlockingQueue with a small capacity, as you do not want your Java Heap to fill up with objects from this
Processor if the Processor is stopped for a while. You can look at how this is done in the GetTwitter processor,
if you would like to have an example to look at.

Let us know if you have any more questions, or if anything is still not clear!

Thanks
-Mark


[1] http://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#interacting-with-controller-service



> On Sep 27, 2015, at 3:53 PM, Sumanth Chinthagunta <xm...@gmail.com> wrote:
> 
> 
> Hi All,
> I am new to NiFi and   I'm stuck with couple issues:  
> 
> 1. Unable to get hold of ControllerService  from Processor’s init method. 
> 	I wanted to pre-set some dependencies in my Processor during init phase instead of  querying them repeatedly in onTrigger method. 
> 	I am getting null for service and not sure what I have to pass for 'serviceIdentifier’ . I couldn't find documentation or examples on how to give Identifier to a service.  
> 
> 
> 	final VertxServiceInterface vertxService = (VertxServiceInterface) context.getControllerServiceLookup().getControllerService("VertxService”)
> 
> 	https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55 <https://github.com/xmlking/nifi-websocket/blob/master/src/main/java/com/crossbusiness/nifi/processors/PutEventBus.java#L55>
> 
> 2. for my usecase I get data published to a topic from EventBus with following code. 
> 
> 	EventBus eb = vertx.eventBus();
> 
> 	eb.consumer("news.uk.sport", message -> {
> 	 	System.out.println("I have received a message: " + message.body());
> 	});
> 	
> 	I am working on a date ingest processor (push based) that needs to listen for new messages on a topic and send them to flow as FlowFile. 
> 	In my case data source is EvenBus that expose emit messages via callback API. 
> 	I am looking for ideas on how to call Processor’s onTrigger method when the above callback is evoked. 
> 	Should I have to use my own intermediate queue and poll it in onTrigger method? 
> 	is there a better way to trigger the  onTrigger method programmatically ? 
> 
> Thanks 
> Sumo