You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by idioma <co...@gmail.com> on 2016/03/31 13:20:17 UTC

Import Kafka messages into Titan

Hi,
I am very new to NiFi and I have the following flow:

Consume Messages from Kafka based on a particular topic (JSON format)
->Transform JSON format into some Titan-compliant format -> put them into
Titan/ElasticSearch on AWS

I have done researching and I believe I can set use the standard processor
GetKafka and PutElasticSearch for the two "extremes" of the process flow.
Can you confirm this? Would I need to write my own processor? (I am working
with Java) I feel that I would need to write a Java processor for the actual
transformation from JSON format into a graph one. Is that correct? Any
suitable resource/project that can be useful to get me going with this? 

Thanks,

I.



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Import-Kafka-messages-into-Titan-tp8647.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: Import Kafka messages into Titan

Posted by idioma <co...@gmail.com>.
Simon,
thanks for this. This sounds very reasonable. I have a very naive question
on top my initial now, I am afraid. If I end up using 4 standard processors
(GetKafka -> EvaluateJsonPath -> AttributesToJson -> PutElastic) from a Java
application, how do I bundle them? 

Thanks indeed!



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Import-Kafka-messages-into-Titan-tp8647p8649.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: Import Kafka messages into Titan

Posted by idioma <co...@gmail.com>.
Bryan, 
thank you so much, this is exactly what I was looking for. Thank you!



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Import-Kafka-messages-into-Titan-tp8647p8702.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: Import Kafka messages into Titan

Posted by Bryan Bende <bb...@gmail.com>.
For #2,  you can use templates to move the flow (or parts of it) to another
instance.
A possible approach is to organize the flow into process groups and create
a template per process group, making it potentially easier to update parts
of the flow independently.

This project might be helpful to look at in terms of automating deploying a
template from one instance to another:
https://github.com/aperepel/nifi-api-deploy

For properties that are environment specific, if the property supports
expression language, you can specify them in bootstrap.conf as -D
properties for each of your NiFi instances, and in your processors you can
reference them with Expression Language.
For example, in each bootstrap.conf there could be -Dkafka.topic=mytopic
and then in a PutKafka processor set the topic to ${kafka.topic}. This will
let your template be the same for each environment.
Unfortunately at a quick glance it looks like GetKafka topic name does not
support EL, which should probably be fixed to allow this.

In the future there is a plan to have a variable registry exposed through
the UI so that you wouldn't have to edit the bootstrap file to define these
properties.


On Thu, Mar 31, 2016 at 11:58 AM, Matt Burgess <ma...@gmail.com> wrote:

> I'll let someone else have a go at question 2 :)
>
> If you're using ExecuteScript with Groovy, you don't need
> EvaluateJsonPath, Groovy has a JSONSlurper that works nicely (examples on
> my blog).
>
> To put directly into Titan you don't need to convert the format, instead
> you'll want Gremlin (part of Apache Tinkerpop), point your processor's
> Module Path property at a folder containing the Gremlin JARs, then you can
> create the vertices and edges using the approach in the Titan documentation.
>
> This would make an excellent blog post, perhaps I'll give this a try
> myself but please feel welcome to share anything you learn along the way!
> If I get some spare time I'd like to write a PutGraph processor that does
> pretty much what we've outlined here.
>
> Regards,
> Matt
>
> Sent from my iPhone
>
> > On Mar 31, 2016, at 10:15 AM, idioma <co...@gmail.com> wrote:
> >
> > Matt, thank you for this this is brilliant. So, as it is I am thinking
> that I
> > would like to use the following:
> >
> > GetKafka -> EvaluateJsonPath -> ExecuteScript+Groovy Script
> >
> > My questions are two:
> >
> > 1) How do I import the Titan-compliant file into Titan? I guess I can
> modify
> > the script and load it into it.
> > 2) my second quest is more naive and proves the fact my background is
> more
> > on Apache Camel with very little knowledge of NiFi. In a versionControl
> > environment, how do you push a process flow created with NiFi that mostly
> > involves standard components? Do you write customized version where you
> set
> > of kafka properties, for example?
> >
> > Thanks
> >
> >
> >
> > --
> > View this message in context:
> http://apache-nifi-developer-list.39713.n7.nabble.com/Import-Kafka-messages-into-Titan-tp8647p8667.html
> > Sent from the Apache NiFi Developer List mailing list archive at
> Nabble.com.
>

Re: Import Kafka messages into Titan

Posted by idioma <co...@gmail.com>.
Thanks once again Matt, but I wonder whether we can make it even easier.

GetKafka -> Custom Processor that will use the GraphSON Reader lib
(https://github.com/tinkerpop/blueprints/wiki/GraphSON-Reader-and-Writer-Library)
-> Custom PutTitan Processor that will insert the graph into Titan. Does it
actually sound reasonable? What are your thoughts? 

So, I will have two processors in a separation of concerns fashion : one to
read JSON to GraphSon and one that will only point to my processor's Module
Path property (how do you do that?) and insert the data. 

Thanks,

Ilaria



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Import-Kafka-messages-into-Titan-tp8647p8680.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: Import Kafka messages into Titan

Posted by Matt Burgess <ma...@gmail.com>.
I'll let someone else have a go at question 2 :)

If you're using ExecuteScript with Groovy, you don't need EvaluateJsonPath, Groovy has a JSONSlurper that works nicely (examples on my blog).

To put directly into Titan you don't need to convert the format, instead you'll want Gremlin (part of Apache Tinkerpop), point your processor's Module Path property at a folder containing the Gremlin JARs, then you can create the vertices and edges using the approach in the Titan documentation.

This would make an excellent blog post, perhaps I'll give this a try myself but please feel welcome to share anything you learn along the way! If I get some spare time I'd like to write a PutGraph processor that does pretty much what we've outlined here.

Regards,
Matt

Sent from my iPhone

> On Mar 31, 2016, at 10:15 AM, idioma <co...@gmail.com> wrote:
> 
> Matt, thank you for this this is brilliant. So, as it is I am thinking that I
> would like to use the following:
> 
> GetKafka -> EvaluateJsonPath -> ExecuteScript+Groovy Script
> 
> My questions are two:
> 
> 1) How do I import the Titan-compliant file into Titan? I guess I can modify
> the script and load it into it.
> 2) my second quest is more naive and proves the fact my background is more
> on Apache Camel with very little knowledge of NiFi. In a versionControl
> environment, how do you push a process flow created with NiFi that mostly
> involves standard components? Do you write customized version where you set
> of kafka properties, for example? 
> 
> Thanks
> 
> 
> 
> --
> View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Import-Kafka-messages-into-Titan-tp8647p8667.html
> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: Import Kafka messages into Titan

Posted by idioma <co...@gmail.com>.
Matt, thank you for this this is brilliant. So, as it is I am thinking that I
would like to use the following:

GetKafka -> EvaluateJsonPath -> ExecuteScript+Groovy Script

My questions are two:

1) How do I import the Titan-compliant file into Titan? I guess I can modify
the script and load it into it.
2) my second quest is more naive and proves the fact my background is more
on Apache Camel with very little knowledge of NiFi. In a versionControl
environment, how do you push a process flow created with NiFi that mostly
involves standard components? Do you write customized version where you set
of kafka properties, for example? 

Thanks



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Import-Kafka-messages-into-Titan-tp8647p8667.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Re: Import Kafka messages into Titan

Posted by Matt Burgess <ma...@gmail.com>.
Idioma,

There is not yet a JSON-to-JSON translator, although there is a Jira case
to add it (https://issues.apache.org/jira/browse/NIFI-361) However you have
a handful of options here:

1) If you don't have a specific output format in mind, use Simon's approach
to generate a JSON file with the desired attributes.
2) Get the appropriate JSON elements into attributes using
EvaluateJsonPath, then ReplaceText for the new format
3) Use EvaluateJsonPath as above, then use Uwe's template processor (
https://github.com/uwegeercken/nifi_processors)
4) Use ExecuteScript and write a script in Groovy, Javascript, Jython,
JRuby, or Lua to do the custom translation
5) Write your own processor (and please feel welcome to share it with the
community!)

If you go with Option 4, I have some Groovy code that builds a
Tinkerpop3-compliant GraphSON document from NiFi provenance events, the
builder pattern might be useful to you if you need GraphSON for the Titan
backend. The code is available as a Gist (
https://gist.github.com/mattyb149/a43a5c12c39701c4a2feeed71a57c66c), and I
have other examples of JSON-to-JSON conversion on my blog (
funnifi.blogspot.com).

Regards,
Matt

On Thu, Mar 31, 2016 at 7:20 AM, idioma <co...@gmail.com> wrote:

> Hi,
> I am very new to NiFi and I have the following flow:
>
> Consume Messages from Kafka based on a particular topic (JSON format)
> ->Transform JSON format into some Titan-compliant format -> put them into
> Titan/ElasticSearch on AWS
>
> I have done researching and I believe I can set use the standard processor
> GetKafka and PutElasticSearch for the two "extremes" of the process flow.
> Can you confirm this? Would I need to write my own processor? (I am working
> with Java) I feel that I would need to write a Java processor for the
> actual
> transformation from JSON format into a graph one. Is that correct? Any
> suitable resource/project that can be useful to get me going with this?
>
> Thanks,
>
> I.
>
>
>
> --
> View this message in context:
> http://apache-nifi-developer-list.39713.n7.nabble.com/Import-Kafka-messages-into-Titan-tp8647.html
> Sent from the Apache NiFi Developer List mailing list archive at
> Nabble.com.
>


On Thu, Mar 31, 2016 at 8:31 AM, Simon Ball <sb...@hortonworks.com> wrote:

> You don’t necessarily need a custom processor for this. To convert the
> JSON to key values for a graph for example, you can use EvaluateJsonPath on
> your incoming messages from Kafka, this will pull out the pieces you need,
> then use AttributesToJson to select these attributes back into JSON to push
> to the PutElastic processor.
>
> Simon
>
> > On 31 Mar 2016, at 12:20, idioma <co...@gmail.com> wrote:
> >
> > Hi,
> > I am very new to NiFi and I have the following flow:
> >
> > Consume Messages from Kafka based on a particular topic (JSON format)
> > ->Transform JSON format into some Titan-compliant format -> put them into
> > Titan/ElasticSearch on AWS
> >
> > I have done researching and I believe I can set use the standard
> processor
> > GetKafka and PutElasticSearch for the two "extremes" of the process flow.
> > Can you confirm this? Would I need to write my own processor? (I am
> working
> > with Java) I feel that I would need to write a Java processor for the
> actual
> > transformation from JSON format into a graph one. Is that correct? Any
> > suitable resource/project that can be useful to get me going with this?
> >
> > Thanks,
> >
> > I.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-nifi-developer-list.39713.n7.nabble.com/Import-Kafka-messages-into-Titan-tp8647.html
> > Sent from the Apache NiFi Developer List mailing list archive at
> Nabble.com.
> >
>
>

Re: Import Kafka messages into Titan

Posted by Simon Ball <sb...@hortonworks.com>.
You don’t necessarily need a custom processor for this. To convert the JSON to key values for a graph for example, you can use EvaluateJsonPath on your incoming messages from Kafka, this will pull out the pieces you need, then use AttributesToJson to select these attributes back into JSON to push to the PutElastic processor. 

Simon

> On 31 Mar 2016, at 12:20, idioma <co...@gmail.com> wrote:
> 
> Hi,
> I am very new to NiFi and I have the following flow:
> 
> Consume Messages from Kafka based on a particular topic (JSON format)
> ->Transform JSON format into some Titan-compliant format -> put them into
> Titan/ElasticSearch on AWS
> 
> I have done researching and I believe I can set use the standard processor
> GetKafka and PutElasticSearch for the two "extremes" of the process flow.
> Can you confirm this? Would I need to write my own processor? (I am working
> with Java) I feel that I would need to write a Java processor for the actual
> transformation from JSON format into a graph one. Is that correct? Any
> suitable resource/project that can be useful to get me going with this? 
> 
> Thanks,
> 
> I.
> 
> 
> 
> --
> View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Import-Kafka-messages-into-Titan-tp8647.html
> Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.
>