You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Parul Agrawal <pa...@gmail.com> on 2015/10/07 08:50:23 UTC

Need help in nifi- flume processor

Hi,

I was trying to run Nifi Flume processor with the below mentioned
details but not could bring it up.

I already started flume with the sample configuration file
=============================================
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
=============================================

Command used to start flume : $ bin/flume-ng agent --conf conf
--conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

In the Nifi browser of ExecuteFlumeSink following configuration was done:
Property           Value
Sink Type         logger
Agent Name      a1
Sink Name         k1.

Event is sent to the flume using:
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

But I could not get any data in the nifi flume processor. Request your
help in this.
Do i need to change the example.conf file of flume so that Nifi Flume
Sink should get the data.

Thanks and Regards,
Parul

Re: Need help in nifi- flume processor

Posted by Joey Echeverria <jo...@gmail.com>.
Can you share the source code to the custom source?

-Joey

> On Oct 10, 2015, at 03:34, Parul Agrawal <pa...@gmail.com> wrote:
> 
> Hi,
> 
> I added custom flume source and when flume source is sending the data to flume sink, below mentioned error is thrown at flume sink.
> 
>  Administratively Yielded for 1 sec due to processing failure
> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught Exception: java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask
> java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
>         at com.google.common.base.Preconditions.checkState(Preconditions.java:172) ~[guava-r05.jar:na]
>         at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) ~[flume-ng-core-1.6.0.jar:1.6.0]
>         at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105) ~[flume-ng-core-1.6.0.jar:1.6.0]
>         at org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139) ~[na:na]
>         at org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148) ~[na:na]
>         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077) ~[nifi-framework-core-0.3.0.jar:0.3.0]
>         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127) [nifi-framework-core-0.3.0.jar:0.3.0]
>         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49) [nifi-framework-core-0.3.0.jar:0.3.0]
>         at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119) [nifi-framework-core-0.3.0.jar:0.3.0]
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_85]
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_85]
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_85]
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_85]
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_85]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_85]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.flume.ExecuteFlumeSink ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, section=7], offset=180436, length=14078],offset=0,name=8311685679474355,size=14078] is not known in this session (StandardProcessSession[id=218318]); rolling back session: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, section=7], offset=180436, length=14078],offset=0,name=8311685679474355,size=14078] is not known in this session (StandardProcessSession[id=218318])
> 
> Any idea what could be wrong in this.
> 
> Thanks and Regards,
> Parul
> 
> 
>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bb...@gmail.com> wrote:
>> Hi Parul,
>> 
>> I think it would be good to keep the convo going on the users list since there are more people who can offer help there, and also helps everyone learn new solutions.
>> 
>> The quick answer though is that NiFi has an ExecuteProcess processor which could execute "tshark -i eth0 -T pdml". 
>> 
>> There is not currently an XmlToJson processor, so this could be a place where you need a custom processor. For simple cases you can use an EvaluateXPath processor to extract values from the XML, and then a ReplaceText processor to build a new json document from those extracted values.
>> 
>> -Bryan
>> 
>> 
>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <pa...@gmail.com> wrote:
>>> Hi,
>>> 
>>> Little more to add.....
>>>  I need to keep reading the flowfile till END_TAG is received. i.e. we may need to concatenate the flowfile data till END_TAG.
>>> and then convert it to json and call PutFile() processor.
>>> 
>>> Thanks and Regards,
>>> Parul
>>> 
>>> 
>>> 
>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <pa...@gmail.com> wrote:
>>>> Hi,
>>>> 
>>>> Thank you very much again for the guidance provided.
>>>> Basically I would need a processor which would convert XML file to Json.
>>>> 
>>>> Currently I have a flume source which is of type "exec" and the command used is "tshark -i eth0 -T pdml".
>>>> 
>>>> Here Flume source keeps sending data to flume sink. This flow file would be of PDML format.
>>>> 
>>>> Now I need a processor which would do the following
>>>> 
>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>> and END TAG (</packet>)
>>>> 2) Once the XML message is formed convert it to json.
>>>> 3) Place a json file to local directory using PutFile() processor.
>>>> 
>>>> I am not sure if I could able to explain the processor requirement. 
>>>> Would be really great if you could help me in this.
>>>> 
>>>> Thanks and Regards,
>>>> Parul 
>>>> 
>>>> 
>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <jo...@gmail.com> wrote:
>>>>> > If you plan to use NiFi for the long term, it might be worth investing in converting your custom Flume components to NiFi processors. We can help you get started if you need any guidance going that route.
>>>>> 
>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>>> really useful if you have a complex Flume flow and want to migrate
>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>> sources and sinks to NiFi once you knew that it would meet your needs
>>>>> well. NiFi has a lot of documentation on writing processors and the
>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>> execution model.
>>>>> 
>>>>> -Joey
>>>>> 
>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com> wrote:
>>>>> >
>>>>> > Hi Parul,
>>>>> >
>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but due to the way the Flume processors load the classes for the sources and sinks, the jar you deploy to the lib directory also needs to include the other dependencies your source/sink needs (or they each need to individually be in lib/ directly).
>>>>> >
>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>> > https://github.com/bbende/my-flume-source
>>>>> >
>>>>> > It will contain the custom source and following dependencies all in one jar:
>>>>> >
>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>> >
>>>>> > I copied that to NiFi lib, restarted, created an ExecuteFlumeSource processor with the following config:
>>>>> >
>>>>> > Source Type = org.apache.flume.MySource
>>>>> > Agent Name = a1
>>>>> > Source Name = r1
>>>>> > Flume Configuration = a1.sources = r1
>>>>> >
>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>> >
>>>>> > Keep in mind that this could become risky because any classes found in the lib directory would be accessible to all NARs in NiFi and would be found before classes within a NAR because the parent is checked first during class loading. This example isn't too risky because we are only bringing in flume jars and one guava jar, but for example if another nar uses a different version of guava this is going to cause a problem.
>>>>> >
>>>>> > If you plan to use NiFi for the long term, it might be worth investing in converting your custom Flume components to NiFi processors. We can help you get started if you need any guidance going that route.
>>>>> >
>>>>> > -Bryan
>>>>> >
>>>>> >
>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <pa...@gmail.com> wrote:
>>>>> >>
>>>>> >> Hello Bryan,
>>>>> >>
>>>>> >> Thank you very much for your response.
>>>>> >>
>>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>>> >> I have my own customized source and sink? I followed below steps to add my own customized source but it did not work.
>>>>> >>
>>>>> >> 1) Created Maven project and added customized source. (flume.jar was created after this step)
>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>> >> 3) Added flume source processor with the below configuration
>>>>> >>
>>>>> >> Property           Value
>>>>> >> Source Type         com.flume.source.Source
>>>>> >> Agent Name      a1
>>>>> >> Source Name         k1.
>>>>> >>
>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError : /org/apache/flume/PollableSource."
>>>>> >>
>>>>> >> Can you please help me in this regard. Any step/configuration I missed.
>>>>> >>
>>>>> >> Thanks and Regards,
>>>>> >> Parul
>>>>> >>
>>>>> >>
>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com> wrote:
>>>>> >>>
>>>>> >>> Hello,
>>>>> >>>
>>>>> >>> The NiFi Flume processors are for running Flume sources and sinks with in NiFi. They don't communicate with an external Flume process.
>>>>> >>>
>>>>> >>> In your example you would need an ExecuteFlumeSource configured to run the netcat source, connected to a ExecuteFlumeSink configured with the logger.
>>>>> >>>
>>>>> >>> -Bryan
>>>>> >>>
>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <pa...@gmail.com> wrote:
>>>>> >>>>
>>>>> >>>> Hi,
>>>>> >>>>
>>>>> >>>> I was trying to run Nifi Flume processor with the below mentioned
>>>>> >>>> details but not could bring it up.
>>>>> >>>>
>>>>> >>>> I already started flume with the sample configuration file
>>>>> >>>> =============================================
>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>> >>>>
>>>>> >>>> # Name the components on this agent
>>>>> >>>> a1.sources = r1
>>>>> >>>> a1.sinks = k1
>>>>> >>>> a1.channels = c1
>>>>> >>>>
>>>>> >>>> # Describe/configure the source
>>>>> >>>> a1.sources.r1.type = netcat
>>>>> >>>> a1.sources.r1.bind = localhost
>>>>> >>>> a1.sources.r1.port = 44444
>>>>> >>>>
>>>>> >>>> # Describe the sink
>>>>> >>>> a1.sinks.k1.type = logger
>>>>> >>>>
>>>>> >>>> # Use a channel which buffers events in memory
>>>>> >>>> a1.channels.c1.type = memory
>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>> >>>>
>>>>> >>>> # Bind the source and sink to the channel
>>>>> >>>> a1.sources.r1.channels = c1
>>>>> >>>> a1.sinks.k1.channel = c1
>>>>> >>>> =============================================
>>>>> >>>>
>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>>> >>>> --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
>>>>> >>>>
>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration was done:
>>>>> >>>> Property           Value
>>>>> >>>> Sink Type         logger
>>>>> >>>> Agent Name      a1
>>>>> >>>> Sink Name         k1.
>>>>> >>>>
>>>>> >>>> Event is sent to the flume using:
>>>>> >>>> $ telnet localhost 44444
>>>>> >>>> Trying 127.0.0.1...
>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>> >>>> Escape character is '^]'.
>>>>> >>>> Hello world! <ENTER>
>>>>> >>>> OK
>>>>> >>>>
>>>>> >>>> But I could not get any data in the nifi flume processor. Request your
>>>>> >>>> help in this.
>>>>> >>>> Do i need to change the example.conf file of flume so that Nifi Flume
>>>>> >>>> Sink should get the data.
>>>>> >>>>
>>>>> >>>> Thanks and Regards,
>>>>> >>>> Parul
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> --
>>>>> >>> Sent from Gmail Mobile
>>>>> >>
>>>>> >>
>>>>> >
> 

Re: Need help in nifi- flume processor

Posted by Bryan Bende <bb...@gmail.com>.
Hello,

It seems like NiFi is expecting sql.args.1.value to be a numeric type and
trying to parse it into a number.

Can you use NiFi's provenance feature to look at one of the flow files that
produced this error, and let us know what the value of sql.args.1.type is?

You can do this by copying the id of the PutSQL processor from the
Configure/Settings page, and then going to the Provenance page from the
icon in the top right. From there use the search button and paste in the
processor id into the Component ID and do a search. When you get the events
back click on the icon in the first column for one of the events and then
look at the Attributes tab, this is where you should see all the sql
attributes.

-Bryan

On Wed, Nov 4, 2015 at 1:59 AM, Parul Agrawal <pa...@gmail.com>
wrote:

> Hi Bryan,
>
>
> I am trying to insert the following data in to Database using Nifi
> processor* ConvertJsonToSql and PutSQL.*
>
> *Json Object used:*
>  {"index":"1", "num":"1", "len":"58", "caplen":"54", "timestamp":"*Nov
>  4, 2015 00:42:15.000000000 CST*"}
>
> Kindly find the *table description:*
>
> maddb=# \d gen_info;
>                                      Table "public.gen_info"
>   Column   |           Type           |                        Modifiers
>
> -----------+--------------------------+----------------------------------------------------------
>  index     | bigint                   | not null default
> nextval('gen_info_index_seq'::regclass)
>  num       | integer                  |
>  len       | integer                  |
>  caplen    | integer                  |
>  timestamp | *timestamp with time zone *|
>
> Here while inserting timestamp '*Nov  4, 2015 00:42:15.000000000 CST*' ,
> exception is thrown by Nifi Processor
>
> 2015-11-04 00:42:16,798 ERROR [Timer-Driven Process Thread-10]
> o.apache.nifi.processors.standard.PutSQL
> PutSQL[id=04197ba8-25a5-4301-ad00-2cc139972877] Cannot update database for
> StandardFlowFileRecord[uuid=b32687d6-b814-4063-83e4-27af8f16be0b,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1446619336301-1, container=default,
> section=1], offset=8816,
> length=70],offset=0,name=10468777551217095,size=70] due to
> org.apache.nifi.processor.exception.ProcessException: The value of the
> sql.args.1.value *is 'Nov  4, 2015 00:42:15.000000000 CST', which cannot
> be converted into the necessary data type; routing to failure:
> org.apache.nifi.processor.exception.ProcessException: The value of the
> sql.args.1.value is 'Nov  4, 2015 00:42:15.000000000 CST', which cannot be
> converted into the necessary data type.*
>
> Manual insertion in DB is working fine but with Nifi I am getting the
> above exception. Also if timestamp is in sting format no error is thrown.
> Can you please help me in this regard.
>
> Thank you very much for all the guidance and support provided so far.
>
> Thanks and Regards,
> Parul
>
>
>
> On Mon, Oct 26, 2015 at 5:14 PM, Bryan Bende <bb...@gmail.com> wrote:
>
>> Hello,
>>
>> Can you tell us what you are trying to route on in the json? What regular
>> expression did you try in RouteOnContent?
>>
>> -Bryan
>>
>>
>> On Monday, October 26, 2015, Parul Agrawal <pa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Thank you very much for all the support.
>>> I have written a custom processor to split json to multiple json.
>>> Now I would like to route the flowfile based on the content of the
>>> flowfile.
>>> I tried using RouteOnContent. But it did not work.
>>>
>>> Can you please help me how can i route the flowfile based on the
>>> content/data it contains.
>>>
>>> Thanks and Regards,
>>> Parul
>>>
>>>
>>>
>>> On Tue, Oct 13, 2015 at 6:54 PM, Bryan Bende <bb...@gmail.com> wrote:
>>>
>>>> Parul,
>>>>
>>>> You can use SplitJson to take a large JSON document and split an array
>>>> element into individual documents. I took the json you attached and created
>>>> a flow like GetFile -> SplitJson -> SplitJson -> PutFile
>>>>
>>>> In the first SplitJson the path I used was $.packet.proto and in the
>>>> second I used $.field  This seemed to mostly work except some of the splits
>>>> going into PutFile still have another level of "field" which needs to be
>>>> split again so would possibly need some conditional logic to split certain
>>>> documents again.
>>>>
>>>> Alternatively you could write a custom processor that restructures your
>>>> JSON.
>>>>
>>>> -Bryan
>>>>
>>>>
>>>>
>>>> On Tue, Oct 13, 2015 at 8:36 AM, Parul Agrawal <
>>>> parulagrawal14@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I tried with the above json element. But I am getting the below
>>>>> mentioned error:
>>>>>
>>>>> 2015-10-12 23:53:39,209 ERROR [Timer-Driven Process Thread-9]
>>>>> o.a.n.p.standard.ConvertJSONToSQL
>>>>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to parse
>>>>> StandardFlowFileRecord[uuid=dfc16db0-c7a6-4e9e-8b4d-8c5b4ec50742,claim=StandardContentClaim
>>>>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
>>>>> section=1], offset=132621, length=55],offset=0,name=json,size=55] as JSON
>>>>> due to org.apache.nifi.processor.exception.ProcessException: IOException
>>>>> thrown from ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e]:
>>>>> org.codehaus.jackson.JsonParseException: Unexpected character ('I' (code
>>>>> 73)): expected a valid value (number, String, array, object, 'true',
>>>>> 'false' or 'null')
>>>>>
>>>>> Also I have a huge json object attached (new.json). Can you guide me
>>>>> on how do i use ConvertJSONToSQL processor.
>>>>> Should I use any other processor before using ConvertJSONToSQL
>>>>> processor so that this new.json can be converted in to a flat document
>>>>> of key/value pairs, or an array of flat documents.
>>>>>
>>>>> Any help/guidance would be really useful.
>>>>>
>>>>> Thanks and Regards,
>>>>> Parul
>>>>>
>>>>> On Mon, Oct 12, 2015 at 10:36 PM, Bryan Bende <bb...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I think ConvertJSONToSQL expects a flat document of key/value pairs,
>>>>>> or an array of flat documents. So I think your JSON would be:
>>>>>>
>>>>>> [
>>>>>>     {"firstname":"John", "lastname":"Doe"},
>>>>>>     {"firstname":"Anna", "lastname":"Smith"}
>>>>>> ]
>>>>>>
>>>>>> The table name will come from the Table Name property.
>>>>>>
>>>>>> Let us know if this doesn't work.
>>>>>>
>>>>>> -Bryan
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal <
>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Thank you very much for all the support.
>>>>>>> I could able to convert XML format to json  using custom flume
>>>>>>> source.
>>>>>>>
>>>>>>> Now I would need ConvertJSONToSQL processor to insert data into SQL.
>>>>>>> I am trying to get hands-on on this processor. Will update you on
>>>>>>> this.
>>>>>>> Meanwhile if any example you could share to use this processor for a
>>>>>>> sample
>>>>>>> json data, then it would be great.
>>>>>>>
>>>>>>> ===============
>>>>>>>
>>>>>>> 1) I tried using ConvertJSONToSQL processor with the below sample
>>>>>>> json file:
>>>>>>>
>>>>>>> "details":[
>>>>>>>     {"firstname":"John", "lastname":"Doe"},
>>>>>>>     {"firstname":"Anna", "lastname":"Smith"}
>>>>>>> ]
>>>>>>>
>>>>>>> 2) I created table *details *in the postgreSQL
>>>>>>> * select * from details ;*
>>>>>>> * firstname | lastname*
>>>>>>> *-----------+----------*
>>>>>>> *(0 rows)*
>>>>>>>
>>>>>>> 3) ConvertJSONToSQL Processor property details are as below:
>>>>>>> *Property  *                                               *Value*
>>>>>>> JDBC Connection PoolInfo            DBCPConnectionPool
>>>>>>> Statement TypeInfo                      INSERT
>>>>>>> Table NameInfo                            details
>>>>>>> Catalog NameInfo                         No value set
>>>>>>> Translate Field NamesInfo             false
>>>>>>> Unmatched Field BehaviorInfo       Ignore Unmatched Fields
>>>>>>> Update KeysInfo                           No value set
>>>>>>>
>>>>>>> But I am getting the below mentioned error in ConvertJSONToSQL
>>>>>>> Processor.
>>>>>>> 2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1]
>>>>>>> o.a.n.p.standard.ConvertJSONToSQL
>>>>>>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert
>>>>>>> StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim
>>>>>>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
>>>>>>> section=1], offset=115045, length=104],offset=0,name=json,size=104] to a
>>>>>>> SQL INSERT statement due to
>>>>>>> org.apache.nifi.processor.exception.ProcessException: None of the fields in
>>>>>>> the JSON map to the columns defined by the details table; routing to
>>>>>>> failure: org.apache.nifi.processor.exception.ProcessException: None of the
>>>>>>> fields in the JSON map to the columns defined by the details table
>>>>>>>
>>>>>>> Thanks and Regards,
>>>>>>> Parul
>>>>>>>
>>>>>>> On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <jo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I've done something like this by wrapping the command in a shell
>>>>>>>> script:
>>>>>>>>
>>>>>>>> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
>>>>>>>>
>>>>>>>> My use case was slightly different, but I'm pretty sure you can
>>>>>>>> adapt the same idea.
>>>>>>>>
>>>>>>>> -Joey
>>>>>>>>
>>>>>>>> On Oct 10, 2015, at 03:52, Parul Agrawal <pa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I actually need to get the data from pipe.
>>>>>>>> So the actual command I would need is mkfifo /tmp/packet;tshark -i
>>>>>>>> ens160 -T pdml >/tmp/packet.
>>>>>>>> Is it possible to use ExecuteProcessor for multiple commands ?
>>>>>>>>
>>>>>>>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <
>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I added custom flume source and when flume source is sending the
>>>>>>>>> data to flume sink, below mentioned error is thrown at flume sink.
>>>>>>>>>
>>>>>>>>>  Administratively Yielded for 1 sec due to processing failure
>>>>>>>>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
>>>>>>>>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>>>>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
>>>>>>>>> Exception: java.lang.IllegalStateException: close() called when transaction
>>>>>>>>> is OPEN - you must either commit or rollback first
>>>>>>>>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
>>>>>>>>> o.a.n.c.t.ContinuallyRunProcessorTask
>>>>>>>>> java.lang.IllegalStateException: close() called when transaction
>>>>>>>>> is OPEN - you must either commit or rollback first
>>>>>>>>>         at
>>>>>>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>>>>>>>>> ~[guava-r05.jar:na]
>>>>>>>>>         at
>>>>>>>>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>>>>>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
>>>>>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>>>>>>>>> ~[na:na]
>>>>>>>>>         at
>>>>>>>>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>>>>>>>>> ~[na:na]
>>>>>>>>>         at
>>>>>>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>>>>>>>>> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>>>>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>>>>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>>>         at
>>>>>>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>>>>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>>>>>> [na:1.7.0_85]
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>>>>>>> [na:1.7.0_85]
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>>>>>>> [na:1.7.0_85]
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>>> [na:1.7.0_85]
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>> [na:1.7.0_85]
>>>>>>>>>         at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>> [na:1.7.0_85]
>>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>>>>>>>>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
>>>>>>>>> o.a.n.processors.flume.ExecuteFlumeSink
>>>>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
>>>>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
>>>>>>>>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>>>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>>>>>>> section=7], offset=180436,
>>>>>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>>>>>>> this session (StandardProcessSession[id=218318]); rolling back session:
>>>>>>>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>>>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>>>>>>> section=7], offset=180436,
>>>>>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>>>>>>> this session (StandardProcessSession[id=218318])
>>>>>>>>>
>>>>>>>>> Any idea what could be wrong in this.
>>>>>>>>>
>>>>>>>>> Thanks and Regards,
>>>>>>>>> Parul
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bb...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Parul,
>>>>>>>>>>
>>>>>>>>>> I think it would be good to keep the convo going on the users
>>>>>>>>>> list since there are more people who can offer help there, and also helps
>>>>>>>>>> everyone learn new solutions.
>>>>>>>>>>
>>>>>>>>>> The quick answer though is that NiFi has an ExecuteProcess
>>>>>>>>>> processor which could execute "tshark -i eth0 -T pdml".
>>>>>>>>>>
>>>>>>>>>> There is not currently an XmlToJson processor, so this could be a
>>>>>>>>>> place where you need a custom processor. For simple cases you can use an
>>>>>>>>>> EvaluateXPath processor to extract values from the XML, and then a
>>>>>>>>>> ReplaceText processor to build a new json document from those extracted
>>>>>>>>>> values.
>>>>>>>>>>
>>>>>>>>>> -Bryan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <
>>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Little more to add.....
>>>>>>>>>>>  I need to keep reading the flowfile till END_TAG is received.
>>>>>>>>>>> i.e. we may need to concatenate the flowfile data till END_TAG.
>>>>>>>>>>> and then convert it to json and call PutFile() processor.
>>>>>>>>>>>
>>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>> Parul
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <
>>>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you very much again for the guidance provided.
>>>>>>>>>>>> Basically I would need a processor which would convert XML file
>>>>>>>>>>>> to Json.
>>>>>>>>>>>>
>>>>>>>>>>>> Currently I have a flume source which is of type "exec" and the
>>>>>>>>>>>> command used is "tshark -i eth0 -T pdml".
>>>>>>>>>>>>
>>>>>>>>>>>> Here Flume source keeps sending data to flume sink. This flow
>>>>>>>>>>>> file would be of PDML format.
>>>>>>>>>>>>
>>>>>>>>>>>> Now I need a processor which would do the following
>>>>>>>>>>>>
>>>>>>>>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>>>>>>>>> and END TAG (</packet>)
>>>>>>>>>>>> 2) Once the XML message is formed convert it to json.
>>>>>>>>>>>> 3) Place a json file to local directory using PutFile()
>>>>>>>>>>>> processor.
>>>>>>>>>>>>
>>>>>>>>>>>> I am not sure if I could able to explain the processor
>>>>>>>>>>>> requirement.
>>>>>>>>>>>> Would be really great if you could help me in this.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>>> Parul
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <
>>>>>>>>>>>> joey42@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>>>>>>>
>>>>>>>>>>>>> +1. Running Flume sources/sinks is meant as a transition step.
>>>>>>>>>>>>> It's
>>>>>>>>>>>>> really useful if you have a complex Flume flow and want to
>>>>>>>>>>>>> migrate
>>>>>>>>>>>>> only parts of it over to NiFi at a time. I would port any
>>>>>>>>>>>>> custom
>>>>>>>>>>>>> sources and sinks to NiFi once you knew that it would meet
>>>>>>>>>>>>> your needs
>>>>>>>>>>>>> well. NiFi has a lot of documentation on writing processors
>>>>>>>>>>>>> and the
>>>>>>>>>>>>> concepts map pretty well if you're already familiar with
>>>>>>>>>>>>> Flume's
>>>>>>>>>>>>> execution model.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Joey
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Hi Parul,
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi,
>>>>>>>>>>>>> but due to the way the Flume processors load the classes for the sources
>>>>>>>>>>>>> and sinks, the jar you deploy to the lib directory also needs to include
>>>>>>>>>>>>> the other dependencies your source/sink needs (or they each need to
>>>>>>>>>>>>> individually be in lib/ directly).
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > So here is a sample project I created that makes a shaded
>>>>>>>>>>>>> jar:
>>>>>>>>>>>>> > https://github.com/bbende/my-flume-source
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > It will contain the custom source and following dependencies
>>>>>>>>>>>>> all in one jar:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>>>>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>>>>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>>>>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>>>>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>>>>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>>>>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > I copied that to NiFi lib, restarted, created an
>>>>>>>>>>>>> ExecuteFlumeSource processor with the following config:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Source Type = org.apache.flume.MySource
>>>>>>>>>>>>> > Agent Name = a1
>>>>>>>>>>>>> > Source Name = r1
>>>>>>>>>>>>> > Flume Configuration = a1.sources = r1
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Keep in mind that this could become risky because any
>>>>>>>>>>>>> classes found in the lib directory would be accessible to all NARs in NiFi
>>>>>>>>>>>>> and would be found before classes within a NAR because the parent is
>>>>>>>>>>>>> checked first during class loading. This example isn't too risky because we
>>>>>>>>>>>>> are only bringing in flume jars and one guava jar, but for example if
>>>>>>>>>>>>> another nar uses a different version of guava this is going to cause a
>>>>>>>>>>>>> problem.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > -Bryan
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> Hello Bryan,
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> Thank you very much for your response.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> Is it possible to have customized flume source and sink in
>>>>>>>>>>>>> Nifi?
>>>>>>>>>>>>> >> I have my own customized source and sink? I followed below
>>>>>>>>>>>>> steps to add my own customized source but it did not work.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> 1) Created Maven project and added customized source.
>>>>>>>>>>>>> (flume.jar was created after this step)
>>>>>>>>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>>>>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> Property           Value
>>>>>>>>>>>>> >> Source Type         com.flume.source.Source
>>>>>>>>>>>>> >> Agent Name      a1
>>>>>>>>>>>>> >> Source Name         k1.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>>>>>>>>> >> "Failed to run validation due to
>>>>>>>>>>>>> java.lang.NoClassDefFoundError : /org/apache/flume/PollableSource."
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> Can you please help me in this regard. Any
>>>>>>>>>>>>> step/configuration I missed.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> Thanks and Regards,
>>>>>>>>>>>>> >> Parul
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <
>>>>>>>>>>>>> bbende@gmail.com> wrote:
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> Hello,
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> The NiFi Flume processors are for running Flume sources
>>>>>>>>>>>>> and sinks with in NiFi. They don't communicate with an external Flume
>>>>>>>>>>>>> process.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> In your example you would need an ExecuteFlumeSource
>>>>>>>>>>>>> configured to run the netcat source, connected to a ExecuteFlumeSink
>>>>>>>>>>>>> configured with the logger.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> -Bryan
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>>>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Hi,
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> I was trying to run Nifi Flume processor with the below
>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>> >>>> details but not could bring it up.
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> I already started flume with the sample configuration file
>>>>>>>>>>>>> >>>> =============================================
>>>>>>>>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> # Name the components on this agent
>>>>>>>>>>>>> >>>> a1.sources = r1
>>>>>>>>>>>>> >>>> a1.sinks = k1
>>>>>>>>>>>>> >>>> a1.channels = c1
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> # Describe/configure the source
>>>>>>>>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>>>>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>>>>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> # Describe the sink
>>>>>>>>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>>>>>>>>> >>>> a1.channels.c1.type = memory
>>>>>>>>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>>>>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> # Bind the source and sink to the channel
>>>>>>>>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>>>>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>>>>>>>>> >>>> =============================================
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf
>>>>>>>>>>>>> conf
>>>>>>>>>>>>> >>>> --conf-file example.conf --name a1
>>>>>>>>>>>>> -Dflume.root.logger=INFO,console
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following
>>>>>>>>>>>>> configuration was done:
>>>>>>>>>>>>> >>>> Property           Value
>>>>>>>>>>>>> >>>> Sink Type         logger
>>>>>>>>>>>>> >>>> Agent Name      a1
>>>>>>>>>>>>> >>>> Sink Name         k1.
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Event is sent to the flume using:
>>>>>>>>>>>>> >>>> $ telnet localhost 44444
>>>>>>>>>>>>> >>>> Trying 127.0.0.1...
>>>>>>>>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>>>>>>>>> >>>> Escape character is '^]'.
>>>>>>>>>>>>> >>>> Hello world! <ENTER>
>>>>>>>>>>>>> >>>> OK
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> But I could not get any data in the nifi flume processor.
>>>>>>>>>>>>> Request your
>>>>>>>>>>>>> >>>> help in this.
>>>>>>>>>>>>> >>>> Do i need to change the example.conf file of flume so
>>>>>>>>>>>>> that Nifi Flume
>>>>>>>>>>>>> >>>> Sink should get the data.
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Thanks and Regards,
>>>>>>>>>>>>> >>>> Parul
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> --
>>>>>>>>>>>>> >>> Sent from Gmail Mobile
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>> --
>> Sent from Gmail Mobile
>>
>
>

Re: Need help in nifi- flume processor

Posted by Parul Agrawal <pa...@gmail.com>.
Hi Bryan,


I am trying to insert the following data in to Database using Nifi processor*
ConvertJsonToSql and PutSQL.*

*Json Object used:*
 {"index":"1", "num":"1", "len":"58", "caplen":"54", "timestamp":"*Nov  4,
2015 00:42:15.000000000 CST*"}

Kindly find the *table description:*

maddb=# \d gen_info;
                                     Table "public.gen_info"
  Column   |           Type           |                        Modifiers
-----------+--------------------------+----------------------------------------------------------
 index     | bigint                   | not null default
nextval('gen_info_index_seq'::regclass)
 num       | integer                  |
 len       | integer                  |
 caplen    | integer                  |
 timestamp | *timestamp with time zone *|

Here while inserting timestamp '*Nov  4, 2015 00:42:15.000000000 CST*' ,
exception is thrown by Nifi Processor

2015-11-04 00:42:16,798 ERROR [Timer-Driven Process Thread-10]
o.apache.nifi.processors.standard.PutSQL
PutSQL[id=04197ba8-25a5-4301-ad00-2cc139972877] Cannot update database for
StandardFlowFileRecord[uuid=b32687d6-b814-4063-83e4-27af8f16be0b,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1446619336301-1, container=default,
section=1], offset=8816,
length=70],offset=0,name=10468777551217095,size=70] due to
org.apache.nifi.processor.exception.ProcessException: The value of the
sql.args.1.value *is 'Nov  4, 2015 00:42:15.000000000 CST', which cannot be
converted into the necessary data type; routing to failure:
org.apache.nifi.processor.exception.ProcessException: The value of the
sql.args.1.value is 'Nov  4, 2015 00:42:15.000000000 CST', which cannot be
converted into the necessary data type.*

Manual insertion in DB is working fine but with Nifi I am getting the above
exception. Also if timestamp is in sting format no error is thrown. Can you
please help me in this regard.

Thank you very much for all the guidance and support provided so far.

Thanks and Regards,
Parul



On Mon, Oct 26, 2015 at 5:14 PM, Bryan Bende <bb...@gmail.com> wrote:

> Hello,
>
> Can you tell us what you are trying to route on in the json? What regular
> expression did you try in RouteOnContent?
>
> -Bryan
>
>
> On Monday, October 26, 2015, Parul Agrawal <pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Thank you very much for all the support.
>> I have written a custom processor to split json to multiple json.
>> Now I would like to route the flowfile based on the content of the
>> flowfile.
>> I tried using RouteOnContent. But it did not work.
>>
>> Can you please help me how can i route the flowfile based on the
>> content/data it contains.
>>
>> Thanks and Regards,
>> Parul
>>
>>
>>
>> On Tue, Oct 13, 2015 at 6:54 PM, Bryan Bende <bb...@gmail.com> wrote:
>>
>>> Parul,
>>>
>>> You can use SplitJson to take a large JSON document and split an array
>>> element into individual documents. I took the json you attached and created
>>> a flow like GetFile -> SplitJson -> SplitJson -> PutFile
>>>
>>> In the first SplitJson the path I used was $.packet.proto and in the
>>> second I used $.field  This seemed to mostly work except some of the splits
>>> going into PutFile still have another level of "field" which needs to be
>>> split again so would possibly need some conditional logic to split certain
>>> documents again.
>>>
>>> Alternatively you could write a custom processor that restructures your
>>> JSON.
>>>
>>> -Bryan
>>>
>>>
>>>
>>> On Tue, Oct 13, 2015 at 8:36 AM, Parul Agrawal <parulagrawal14@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> I tried with the above json element. But I am getting the below
>>>> mentioned error:
>>>>
>>>> 2015-10-12 23:53:39,209 ERROR [Timer-Driven Process Thread-9]
>>>> o.a.n.p.standard.ConvertJSONToSQL
>>>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to parse
>>>> StandardFlowFileRecord[uuid=dfc16db0-c7a6-4e9e-8b4d-8c5b4ec50742,claim=StandardContentClaim
>>>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
>>>> section=1], offset=132621, length=55],offset=0,name=json,size=55] as JSON
>>>> due to org.apache.nifi.processor.exception.ProcessException: IOException
>>>> thrown from ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e]:
>>>> org.codehaus.jackson.JsonParseException: Unexpected character ('I' (code
>>>> 73)): expected a valid value (number, String, array, object, 'true',
>>>> 'false' or 'null')
>>>>
>>>> Also I have a huge json object attached (new.json). Can you guide me on
>>>> how do i use ConvertJSONToSQL processor.
>>>> Should I use any other processor before using ConvertJSONToSQL
>>>> processor so that this new.json can be converted in to a flat document
>>>> of key/value pairs, or an array of flat documents.
>>>>
>>>> Any help/guidance would be really useful.
>>>>
>>>> Thanks and Regards,
>>>> Parul
>>>>
>>>> On Mon, Oct 12, 2015 at 10:36 PM, Bryan Bende <bb...@gmail.com> wrote:
>>>>
>>>>> I think ConvertJSONToSQL expects a flat document of key/value pairs,
>>>>> or an array of flat documents. So I think your JSON would be:
>>>>>
>>>>> [
>>>>>     {"firstname":"John", "lastname":"Doe"},
>>>>>     {"firstname":"Anna", "lastname":"Smith"}
>>>>> ]
>>>>>
>>>>> The table name will come from the Table Name property.
>>>>>
>>>>> Let us know if this doesn't work.
>>>>>
>>>>> -Bryan
>>>>>
>>>>>
>>>>> On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal <
>>>>> parulagrawal14@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thank you very much for all the support.
>>>>>> I could able to convert XML format to json  using custom flume source.
>>>>>>
>>>>>> Now I would need ConvertJSONToSQL processor to insert data into SQL.
>>>>>> I am trying to get hands-on on this processor. Will update you on
>>>>>> this.
>>>>>> Meanwhile if any example you could share to use this processor for a
>>>>>> sample
>>>>>> json data, then it would be great.
>>>>>>
>>>>>> ===============
>>>>>>
>>>>>> 1) I tried using ConvertJSONToSQL processor with the below sample
>>>>>> json file:
>>>>>>
>>>>>> "details":[
>>>>>>     {"firstname":"John", "lastname":"Doe"},
>>>>>>     {"firstname":"Anna", "lastname":"Smith"}
>>>>>> ]
>>>>>>
>>>>>> 2) I created table *details *in the postgreSQL
>>>>>> * select * from details ;*
>>>>>> * firstname | lastname*
>>>>>> *-----------+----------*
>>>>>> *(0 rows)*
>>>>>>
>>>>>> 3) ConvertJSONToSQL Processor property details are as below:
>>>>>> *Property  *                                               *Value*
>>>>>> JDBC Connection PoolInfo            DBCPConnectionPool
>>>>>> Statement TypeInfo                      INSERT
>>>>>> Table NameInfo                            details
>>>>>> Catalog NameInfo                         No value set
>>>>>> Translate Field NamesInfo             false
>>>>>> Unmatched Field BehaviorInfo       Ignore Unmatched Fields
>>>>>> Update KeysInfo                           No value set
>>>>>>
>>>>>> But I am getting the below mentioned error in ConvertJSONToSQL
>>>>>> Processor.
>>>>>> 2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1]
>>>>>> o.a.n.p.standard.ConvertJSONToSQL
>>>>>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert
>>>>>> StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim
>>>>>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
>>>>>> section=1], offset=115045, length=104],offset=0,name=json,size=104] to a
>>>>>> SQL INSERT statement due to
>>>>>> org.apache.nifi.processor.exception.ProcessException: None of the fields in
>>>>>> the JSON map to the columns defined by the details table; routing to
>>>>>> failure: org.apache.nifi.processor.exception.ProcessException: None of the
>>>>>> fields in the JSON map to the columns defined by the details table
>>>>>>
>>>>>> Thanks and Regards,
>>>>>> Parul
>>>>>>
>>>>>> On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <jo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I've done something like this by wrapping the command in a shell
>>>>>>> script:
>>>>>>>
>>>>>>> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
>>>>>>>
>>>>>>> My use case was slightly different, but I'm pretty sure you can
>>>>>>> adapt the same idea.
>>>>>>>
>>>>>>> -Joey
>>>>>>>
>>>>>>> On Oct 10, 2015, at 03:52, Parul Agrawal <pa...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I actually need to get the data from pipe.
>>>>>>> So the actual command I would need is mkfifo /tmp/packet;tshark -i
>>>>>>> ens160 -T pdml >/tmp/packet.
>>>>>>> Is it possible to use ExecuteProcessor for multiple commands ?
>>>>>>>
>>>>>>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <
>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I added custom flume source and when flume source is sending the
>>>>>>>> data to flume sink, below mentioned error is thrown at flume sink.
>>>>>>>>
>>>>>>>>  Administratively Yielded for 1 sec due to processing failure
>>>>>>>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
>>>>>>>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>>>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
>>>>>>>> Exception: java.lang.IllegalStateException: close() called when transaction
>>>>>>>> is OPEN - you must either commit or rollback first
>>>>>>>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
>>>>>>>> o.a.n.c.t.ContinuallyRunProcessorTask
>>>>>>>> java.lang.IllegalStateException: close() called when transaction is
>>>>>>>> OPEN - you must either commit or rollback first
>>>>>>>>         at
>>>>>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>>>>>>>> ~[guava-r05.jar:na]
>>>>>>>>         at
>>>>>>>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>>>>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>>>>>         at
>>>>>>>> org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
>>>>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>>>>>         at
>>>>>>>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>>>>>>>> ~[na:na]
>>>>>>>>         at
>>>>>>>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>>>>>>>> ~[na:na]
>>>>>>>>         at
>>>>>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>>>>>>>> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>>         at
>>>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>>>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>>         at
>>>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>>>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>>         at
>>>>>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>>>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>>         at
>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>>>>> [na:1.7.0_85]
>>>>>>>>         at
>>>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>>>>>> [na:1.7.0_85]
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>>>>>> [na:1.7.0_85]
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>> [na:1.7.0_85]
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>> [na:1.7.0_85]
>>>>>>>>         at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>> [na:1.7.0_85]
>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>>>>>>>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
>>>>>>>> o.a.n.processors.flume.ExecuteFlumeSink
>>>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
>>>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
>>>>>>>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>>>>>> section=7], offset=180436,
>>>>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>>>>>> this session (StandardProcessSession[id=218318]); rolling back session:
>>>>>>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>>>>>> section=7], offset=180436,
>>>>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>>>>>> this session (StandardProcessSession[id=218318])
>>>>>>>>
>>>>>>>> Any idea what could be wrong in this.
>>>>>>>>
>>>>>>>> Thanks and Regards,
>>>>>>>> Parul
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bb...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Parul,
>>>>>>>>>
>>>>>>>>> I think it would be good to keep the convo going on the users list
>>>>>>>>> since there are more people who can offer help there, and also helps
>>>>>>>>> everyone learn new solutions.
>>>>>>>>>
>>>>>>>>> The quick answer though is that NiFi has an ExecuteProcess
>>>>>>>>> processor which could execute "tshark -i eth0 -T pdml".
>>>>>>>>>
>>>>>>>>> There is not currently an XmlToJson processor, so this could be a
>>>>>>>>> place where you need a custom processor. For simple cases you can use an
>>>>>>>>> EvaluateXPath processor to extract values from the XML, and then a
>>>>>>>>> ReplaceText processor to build a new json document from those extracted
>>>>>>>>> values.
>>>>>>>>>
>>>>>>>>> -Bryan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <
>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Little more to add.....
>>>>>>>>>>  I need to keep reading the flowfile till END_TAG is received.
>>>>>>>>>> i.e. we may need to concatenate the flowfile data till END_TAG.
>>>>>>>>>> and then convert it to json and call PutFile() processor.
>>>>>>>>>>
>>>>>>>>>> Thanks and Regards,
>>>>>>>>>> Parul
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <
>>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Thank you very much again for the guidance provided.
>>>>>>>>>>> Basically I would need a processor which would convert XML file
>>>>>>>>>>> to Json.
>>>>>>>>>>>
>>>>>>>>>>> Currently I have a flume source which is of type "exec" and the
>>>>>>>>>>> command used is "tshark -i eth0 -T pdml".
>>>>>>>>>>>
>>>>>>>>>>> Here Flume source keeps sending data to flume sink. This flow
>>>>>>>>>>> file would be of PDML format.
>>>>>>>>>>>
>>>>>>>>>>> Now I need a processor which would do the following
>>>>>>>>>>>
>>>>>>>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>>>>>>>> and END TAG (</packet>)
>>>>>>>>>>> 2) Once the XML message is formed convert it to json.
>>>>>>>>>>> 3) Place a json file to local directory using PutFile()
>>>>>>>>>>> processor.
>>>>>>>>>>>
>>>>>>>>>>> I am not sure if I could able to explain the processor
>>>>>>>>>>> requirement.
>>>>>>>>>>> Would be really great if you could help me in this.
>>>>>>>>>>>
>>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>> Parul
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <
>>>>>>>>>>> joey42@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>>>>>>
>>>>>>>>>>>> +1. Running Flume sources/sinks is meant as a transition step.
>>>>>>>>>>>> It's
>>>>>>>>>>>> really useful if you have a complex Flume flow and want to
>>>>>>>>>>>> migrate
>>>>>>>>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>>>>>>>>> sources and sinks to NiFi once you knew that it would meet your
>>>>>>>>>>>> needs
>>>>>>>>>>>> well. NiFi has a lot of documentation on writing processors and
>>>>>>>>>>>> the
>>>>>>>>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>>>>>>>>> execution model.
>>>>>>>>>>>>
>>>>>>>>>>>> -Joey
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> > Hi Parul,
>>>>>>>>>>>> >
>>>>>>>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi,
>>>>>>>>>>>> but due to the way the Flume processors load the classes for the sources
>>>>>>>>>>>> and sinks, the jar you deploy to the lib directory also needs to include
>>>>>>>>>>>> the other dependencies your source/sink needs (or they each need to
>>>>>>>>>>>> individually be in lib/ directly).
>>>>>>>>>>>> >
>>>>>>>>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>>>>>>>>> > https://github.com/bbende/my-flume-source
>>>>>>>>>>>> >
>>>>>>>>>>>> > It will contain the custom source and following dependencies
>>>>>>>>>>>> all in one jar:
>>>>>>>>>>>> >
>>>>>>>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>>>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>>>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>>>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>>>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>>>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>>>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>>>>>>>> >
>>>>>>>>>>>> > I copied that to NiFi lib, restarted, created an
>>>>>>>>>>>> ExecuteFlumeSource processor with the following config:
>>>>>>>>>>>> >
>>>>>>>>>>>> > Source Type = org.apache.flume.MySource
>>>>>>>>>>>> > Agent Name = a1
>>>>>>>>>>>> > Source Name = r1
>>>>>>>>>>>> > Flume Configuration = a1.sources = r1
>>>>>>>>>>>> >
>>>>>>>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>>>>>>>> >
>>>>>>>>>>>> > Keep in mind that this could become risky because any classes
>>>>>>>>>>>> found in the lib directory would be accessible to all NARs in NiFi and
>>>>>>>>>>>> would be found before classes within a NAR because the parent is checked
>>>>>>>>>>>> first during class loading. This example isn't too risky because we are
>>>>>>>>>>>> only bringing in flume jars and one guava jar, but for example if another
>>>>>>>>>>>> nar uses a different version of guava this is going to cause a problem.
>>>>>>>>>>>> >
>>>>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>>>>>> >
>>>>>>>>>>>> > -Bryan
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> Hello Bryan,
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> Thank you very much for your response.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> Is it possible to have customized flume source and sink in
>>>>>>>>>>>> Nifi?
>>>>>>>>>>>> >> I have my own customized source and sink? I followed below
>>>>>>>>>>>> steps to add my own customized source but it did not work.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> 1) Created Maven project and added customized source.
>>>>>>>>>>>> (flume.jar was created after this step)
>>>>>>>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>>>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> Property           Value
>>>>>>>>>>>> >> Source Type         com.flume.source.Source
>>>>>>>>>>>> >> Agent Name      a1
>>>>>>>>>>>> >> Source Name         k1.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>>>>>>>> >> "Failed to run validation due to
>>>>>>>>>>>> java.lang.NoClassDefFoundError : /org/apache/flume/PollableSource."
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> Can you please help me in this regard. Any
>>>>>>>>>>>> step/configuration I missed.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> Thanks and Regards,
>>>>>>>>>>>> >> Parul
>>>>>>>>>>>> >>
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <
>>>>>>>>>>>> bbende@gmail.com> wrote:
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> Hello,
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> The NiFi Flume processors are for running Flume sources and
>>>>>>>>>>>> sinks with in NiFi. They don't communicate with an external Flume process.
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> In your example you would need an ExecuteFlumeSource
>>>>>>>>>>>> configured to run the netcat source, connected to a ExecuteFlumeSink
>>>>>>>>>>>> configured with the logger.
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> -Bryan
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> Hi,
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> I was trying to run Nifi Flume processor with the below
>>>>>>>>>>>> mentioned
>>>>>>>>>>>> >>>> details but not could bring it up.
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> I already started flume with the sample configuration file
>>>>>>>>>>>> >>>> =============================================
>>>>>>>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> # Name the components on this agent
>>>>>>>>>>>> >>>> a1.sources = r1
>>>>>>>>>>>> >>>> a1.sinks = k1
>>>>>>>>>>>> >>>> a1.channels = c1
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> # Describe/configure the source
>>>>>>>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>>>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>>>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> # Describe the sink
>>>>>>>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>>>>>>>> >>>> a1.channels.c1.type = memory
>>>>>>>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>>>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> # Bind the source and sink to the channel
>>>>>>>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>>>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>>>>>>>> >>>> =============================================
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf
>>>>>>>>>>>> conf
>>>>>>>>>>>> >>>> --conf-file example.conf --name a1
>>>>>>>>>>>> -Dflume.root.logger=INFO,console
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following
>>>>>>>>>>>> configuration was done:
>>>>>>>>>>>> >>>> Property           Value
>>>>>>>>>>>> >>>> Sink Type         logger
>>>>>>>>>>>> >>>> Agent Name      a1
>>>>>>>>>>>> >>>> Sink Name         k1.
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> Event is sent to the flume using:
>>>>>>>>>>>> >>>> $ telnet localhost 44444
>>>>>>>>>>>> >>>> Trying 127.0.0.1...
>>>>>>>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>>>>>>>> >>>> Escape character is '^]'.
>>>>>>>>>>>> >>>> Hello world! <ENTER>
>>>>>>>>>>>> >>>> OK
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> But I could not get any data in the nifi flume processor.
>>>>>>>>>>>> Request your
>>>>>>>>>>>> >>>> help in this.
>>>>>>>>>>>> >>>> Do i need to change the example.conf file of flume so that
>>>>>>>>>>>> Nifi Flume
>>>>>>>>>>>> >>>> Sink should get the data.
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> Thanks and Regards,
>>>>>>>>>>>> >>>> Parul
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> --
>>>>>>>>>>>> >>> Sent from Gmail Mobile
>>>>>>>>>>>> >>
>>>>>>>>>>>> >>
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
> --
> Sent from Gmail Mobile
>

Re: Need help in nifi- flume processor

Posted by Bryan Bende <bb...@gmail.com>.
Hello,

Can you tell us what you are trying to route on in the json? What regular
expression did you try in RouteOnContent?

-Bryan

On Monday, October 26, 2015, Parul Agrawal <pa...@gmail.com> wrote:

> Hi,
>
> Thank you very much for all the support.
> I have written a custom processor to split json to multiple json.
> Now I would like to route the flowfile based on the content of the
> flowfile.
> I tried using RouteOnContent. But it did not work.
>
> Can you please help me how can i route the flowfile based on the
> content/data it contains.
>
> Thanks and Regards,
> Parul
>
>
>
> On Tue, Oct 13, 2015 at 6:54 PM, Bryan Bende <bbende@gmail.com
> <javascript:_e(%7B%7D,'cvml','bbende@gmail.com');>> wrote:
>
>> Parul,
>>
>> You can use SplitJson to take a large JSON document and split an array
>> element into individual documents. I took the json you attached and created
>> a flow like GetFile -> SplitJson -> SplitJson -> PutFile
>>
>> In the first SplitJson the path I used was $.packet.proto and in the
>> second I used $.field  This seemed to mostly work except some of the splits
>> going into PutFile still have another level of "field" which needs to be
>> split again so would possibly need some conditional logic to split certain
>> documents again.
>>
>> Alternatively you could write a custom processor that restructures your
>> JSON.
>>
>> -Bryan
>>
>>
>>
>> On Tue, Oct 13, 2015 at 8:36 AM, Parul Agrawal <parulagrawal14@gmail.com
>> <javascript:_e(%7B%7D,'cvml','parulagrawal14@gmail.com');>> wrote:
>>
>>> Hi,
>>>
>>> I tried with the above json element. But I am getting the below
>>> mentioned error:
>>>
>>> 2015-10-12 23:53:39,209 ERROR [Timer-Driven Process Thread-9]
>>> o.a.n.p.standard.ConvertJSONToSQL
>>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to parse
>>> StandardFlowFileRecord[uuid=dfc16db0-c7a6-4e9e-8b4d-8c5b4ec50742,claim=StandardContentClaim
>>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
>>> section=1], offset=132621, length=55],offset=0,name=json,size=55] as JSON
>>> due to org.apache.nifi.processor.exception.ProcessException: IOException
>>> thrown from ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e]:
>>> org.codehaus.jackson.JsonParseException: Unexpected character ('I' (code
>>> 73)): expected a valid value (number, String, array, object, 'true',
>>> 'false' or 'null')
>>>
>>> Also I have a huge json object attached (new.json). Can you guide me on
>>> how do i use ConvertJSONToSQL processor.
>>> Should I use any other processor before using ConvertJSONToSQL processor
>>> so that this new.json can be converted in to a flat document of
>>> key/value pairs, or an array of flat documents.
>>>
>>> Any help/guidance would be really useful.
>>>
>>> Thanks and Regards,
>>> Parul
>>>
>>> On Mon, Oct 12, 2015 at 10:36 PM, Bryan Bende <bbende@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','bbende@gmail.com');>> wrote:
>>>
>>>> I think ConvertJSONToSQL expects a flat document of key/value pairs, or
>>>> an array of flat documents. So I think your JSON would be:
>>>>
>>>> [
>>>>     {"firstname":"John", "lastname":"Doe"},
>>>>     {"firstname":"Anna", "lastname":"Smith"}
>>>> ]
>>>>
>>>> The table name will come from the Table Name property.
>>>>
>>>> Let us know if this doesn't work.
>>>>
>>>> -Bryan
>>>>
>>>>
>>>> On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal <
>>>> parulagrawal14@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','parulagrawal14@gmail.com');>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thank you very much for all the support.
>>>>> I could able to convert XML format to json  using custom flume source.
>>>>>
>>>>> Now I would need ConvertJSONToSQL processor to insert data into SQL.
>>>>> I am trying to get hands-on on this processor. Will update you on this.
>>>>> Meanwhile if any example you could share to use this processor for a
>>>>> sample
>>>>> json data, then it would be great.
>>>>>
>>>>> ===============
>>>>>
>>>>> 1) I tried using ConvertJSONToSQL processor with the below sample json
>>>>> file:
>>>>>
>>>>> "details":[
>>>>>     {"firstname":"John", "lastname":"Doe"},
>>>>>     {"firstname":"Anna", "lastname":"Smith"}
>>>>> ]
>>>>>
>>>>> 2) I created table *details *in the postgreSQL
>>>>> * select * from details ;*
>>>>> * firstname | lastname*
>>>>> *-----------+----------*
>>>>> *(0 rows)*
>>>>>
>>>>> 3) ConvertJSONToSQL Processor property details are as below:
>>>>> *Property  *                                               *Value*
>>>>> JDBC Connection PoolInfo            DBCPConnectionPool
>>>>> Statement TypeInfo                      INSERT
>>>>> Table NameInfo                            details
>>>>> Catalog NameInfo                         No value set
>>>>> Translate Field NamesInfo             false
>>>>> Unmatched Field BehaviorInfo       Ignore Unmatched Fields
>>>>> Update KeysInfo                           No value set
>>>>>
>>>>> But I am getting the below mentioned error in ConvertJSONToSQL
>>>>> Processor.
>>>>> 2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1]
>>>>> o.a.n.p.standard.ConvertJSONToSQL
>>>>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert
>>>>> StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim
>>>>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
>>>>> section=1], offset=115045, length=104],offset=0,name=json,size=104] to a
>>>>> SQL INSERT statement due to
>>>>> org.apache.nifi.processor.exception.ProcessException: None of the fields in
>>>>> the JSON map to the columns defined by the details table; routing to
>>>>> failure: org.apache.nifi.processor.exception.ProcessException: None of the
>>>>> fields in the JSON map to the columns defined by the details table
>>>>>
>>>>> Thanks and Regards,
>>>>> Parul
>>>>>
>>>>> On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <joey42@gmail.com
>>>>> <javascript:_e(%7B%7D,'cvml','joey42@gmail.com');>> wrote:
>>>>>
>>>>>> I've done something like this by wrapping the command in a shell
>>>>>> script:
>>>>>>
>>>>>> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
>>>>>>
>>>>>> My use case was slightly different, but I'm pretty sure you can adapt
>>>>>> the same idea.
>>>>>>
>>>>>> -Joey
>>>>>>
>>>>>> On Oct 10, 2015, at 03:52, Parul Agrawal <parulagrawal14@gmail.com
>>>>>> <javascript:_e(%7B%7D,'cvml','parulagrawal14@gmail.com');>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I actually need to get the data from pipe.
>>>>>> So the actual command I would need is mkfifo /tmp/packet;tshark -i
>>>>>> ens160 -T pdml >/tmp/packet.
>>>>>> Is it possible to use ExecuteProcessor for multiple commands ?
>>>>>>
>>>>>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <
>>>>>> parulagrawal14@gmail.com
>>>>>> <javascript:_e(%7B%7D,'cvml','parulagrawal14@gmail.com');>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I added custom flume source and when flume source is sending the
>>>>>>> data to flume sink, below mentioned error is thrown at flume sink.
>>>>>>>
>>>>>>>  Administratively Yielded for 1 sec due to processing failure
>>>>>>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
>>>>>>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
>>>>>>> Exception: java.lang.IllegalStateException: close() called when transaction
>>>>>>> is OPEN - you must either commit or rollback first
>>>>>>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
>>>>>>> o.a.n.c.t.ContinuallyRunProcessorTask
>>>>>>> java.lang.IllegalStateException: close() called when transaction is
>>>>>>> OPEN - you must either commit or rollback first
>>>>>>>         at
>>>>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>>>>>>> ~[guava-r05.jar:na]
>>>>>>>         at
>>>>>>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>>>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>>>>         at
>>>>>>> org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
>>>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>>>>         at
>>>>>>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>>>>>>> ~[na:na]
>>>>>>>         at
>>>>>>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>>>>>>> ~[na:na]
>>>>>>>         at
>>>>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>>>>>>> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>         at
>>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>         at
>>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>         at
>>>>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>>         at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>>>> [na:1.7.0_85]
>>>>>>>         at
>>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>>>>> [na:1.7.0_85]
>>>>>>>         at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>>>>> [na:1.7.0_85]
>>>>>>>         at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>> [na:1.7.0_85]
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>> [na:1.7.0_85]
>>>>>>>         at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>> [na:1.7.0_85]
>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>>>>>>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
>>>>>>> o.a.n.processors.flume.ExecuteFlumeSink
>>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
>>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
>>>>>>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>>>>> section=7], offset=180436,
>>>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>>>>> this session (StandardProcessSession[id=218318]); rolling back session:
>>>>>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>>>>> section=7], offset=180436,
>>>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>>>>> this session (StandardProcessSession[id=218318])
>>>>>>>
>>>>>>> Any idea what could be wrong in this.
>>>>>>>
>>>>>>> Thanks and Regards,
>>>>>>> Parul
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bbende@gmail.com
>>>>>>> <javascript:_e(%7B%7D,'cvml','bbende@gmail.com');>> wrote:
>>>>>>>
>>>>>>>> Hi Parul,
>>>>>>>>
>>>>>>>> I think it would be good to keep the convo going on the users list
>>>>>>>> since there are more people who can offer help there, and also helps
>>>>>>>> everyone learn new solutions.
>>>>>>>>
>>>>>>>> The quick answer though is that NiFi has an ExecuteProcess
>>>>>>>> processor which could execute "tshark -i eth0 -T pdml".
>>>>>>>>
>>>>>>>> There is not currently an XmlToJson processor, so this could be a
>>>>>>>> place where you need a custom processor. For simple cases you can use an
>>>>>>>> EvaluateXPath processor to extract values from the XML, and then a
>>>>>>>> ReplaceText processor to build a new json document from those extracted
>>>>>>>> values.
>>>>>>>>
>>>>>>>> -Bryan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <
>>>>>>>> parulagrawal14@gmail.com
>>>>>>>> <javascript:_e(%7B%7D,'cvml','parulagrawal14@gmail.com');>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Little more to add.....
>>>>>>>>>  I need to keep reading the flowfile till END_TAG is received.
>>>>>>>>> i.e. we may need to concatenate the flowfile data till END_TAG.
>>>>>>>>> and then convert it to json and call PutFile() processor.
>>>>>>>>>
>>>>>>>>> Thanks and Regards,
>>>>>>>>> Parul
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <
>>>>>>>>> parulagrawal14@gmail.com
>>>>>>>>> <javascript:_e(%7B%7D,'cvml','parulagrawal14@gmail.com');>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Thank you very much again for the guidance provided.
>>>>>>>>>> Basically I would need a processor which would convert XML file
>>>>>>>>>> to Json.
>>>>>>>>>>
>>>>>>>>>> Currently I have a flume source which is of type "exec" and the
>>>>>>>>>> command used is "tshark -i eth0 -T pdml".
>>>>>>>>>>
>>>>>>>>>> Here Flume source keeps sending data to flume sink. This flow
>>>>>>>>>> file would be of PDML format.
>>>>>>>>>>
>>>>>>>>>> Now I need a processor which would do the following
>>>>>>>>>>
>>>>>>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>>>>>>> and END TAG (</packet>)
>>>>>>>>>> 2) Once the XML message is formed convert it to json.
>>>>>>>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>>>>>>>
>>>>>>>>>> I am not sure if I could able to explain the processor
>>>>>>>>>> requirement.
>>>>>>>>>> Would be really great if you could help me in this.
>>>>>>>>>>
>>>>>>>>>> Thanks and Regards,
>>>>>>>>>> Parul
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <
>>>>>>>>>> joey42@gmail.com
>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','joey42@gmail.com');>> wrote:
>>>>>>>>>>
>>>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>>>>>
>>>>>>>>>>> +1. Running Flume sources/sinks is meant as a transition step.
>>>>>>>>>>> It's
>>>>>>>>>>> really useful if you have a complex Flume flow and want to
>>>>>>>>>>> migrate
>>>>>>>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>>>>>>>> sources and sinks to NiFi once you knew that it would meet your
>>>>>>>>>>> needs
>>>>>>>>>>> well. NiFi has a lot of documentation on writing processors and
>>>>>>>>>>> the
>>>>>>>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>>>>>>>> execution model.
>>>>>>>>>>>
>>>>>>>>>>> -Joey
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bbende@gmail.com
>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','bbende@gmail.com');>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Hi Parul,
>>>>>>>>>>> >
>>>>>>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi,
>>>>>>>>>>> but due to the way the Flume processors load the classes for the sources
>>>>>>>>>>> and sinks, the jar you deploy to the lib directory also needs to include
>>>>>>>>>>> the other dependencies your source/sink needs (or they each need to
>>>>>>>>>>> individually be in lib/ directly).
>>>>>>>>>>> >
>>>>>>>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>>>>>>>> > https://github.com/bbende/my-flume-source
>>>>>>>>>>> >
>>>>>>>>>>> > It will contain the custom source and following dependencies
>>>>>>>>>>> all in one jar:
>>>>>>>>>>> >
>>>>>>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>>>>>>> >
>>>>>>>>>>> > I copied that to NiFi lib, restarted, created an
>>>>>>>>>>> ExecuteFlumeSource processor with the following config:
>>>>>>>>>>> >
>>>>>>>>>>> > Source Type = org.apache.flume.MySource
>>>>>>>>>>> > Agent Name = a1
>>>>>>>>>>> > Source Name = r1
>>>>>>>>>>> > Flume Configuration = a1.sources = r1
>>>>>>>>>>> >
>>>>>>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>>>>>>> >
>>>>>>>>>>> > Keep in mind that this could become risky because any classes
>>>>>>>>>>> found in the lib directory would be accessible to all NARs in NiFi and
>>>>>>>>>>> would be found before classes within a NAR because the parent is checked
>>>>>>>>>>> first during class loading. This example isn't too risky because we are
>>>>>>>>>>> only bringing in flume jars and one guava jar, but for example if another
>>>>>>>>>>> nar uses a different version of guava this is going to cause a problem.
>>>>>>>>>>> >
>>>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>>>>> >
>>>>>>>>>>> > -Bryan
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>>>>>>>> parulagrawal14@gmail.com
>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','parulagrawal14@gmail.com');>>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >>
>>>>>>>>>>> >> Hello Bryan,
>>>>>>>>>>> >>
>>>>>>>>>>> >> Thank you very much for your response.
>>>>>>>>>>> >>
>>>>>>>>>>> >> Is it possible to have customized flume source and sink in
>>>>>>>>>>> Nifi?
>>>>>>>>>>> >> I have my own customized source and sink? I followed below
>>>>>>>>>>> steps to add my own customized source but it did not work.
>>>>>>>>>>> >>
>>>>>>>>>>> >> 1) Created Maven project and added customized source.
>>>>>>>>>>> (flume.jar was created after this step)
>>>>>>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>>>>>>> >>
>>>>>>>>>>> >> Property           Value
>>>>>>>>>>> >> Source Type         com.flume.source.Source
>>>>>>>>>>> >> Agent Name      a1
>>>>>>>>>>> >> Source Name         k1.
>>>>>>>>>>> >>
>>>>>>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>>>>>>> >> "Failed to run validation due to
>>>>>>>>>>> java.lang.NoClassDefFoundError : /org/apache/flume/PollableSource."
>>>>>>>>>>> >>
>>>>>>>>>>> >> Can you please help me in this regard. Any step/configuration
>>>>>>>>>>> I missed.
>>>>>>>>>>> >>
>>>>>>>>>>> >> Thanks and Regards,
>>>>>>>>>>> >> Parul
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bbende@gmail.com
>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','bbende@gmail.com');>> wrote:
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Hello,
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> The NiFi Flume processors are for running Flume sources and
>>>>>>>>>>> sinks with in NiFi. They don't communicate with an external Flume process.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> In your example you would need an ExecuteFlumeSource
>>>>>>>>>>> configured to run the netcat source, connected to a ExecuteFlumeSink
>>>>>>>>>>> configured with the logger.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> -Bryan
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>>>>>>>>> parulagrawal14@gmail.com
>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','parulagrawal14@gmail.com');>>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Hi,
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> I was trying to run Nifi Flume processor with the below
>>>>>>>>>>> mentioned
>>>>>>>>>>> >>>> details but not could bring it up.
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> I already started flume with the sample configuration file
>>>>>>>>>>> >>>> =============================================
>>>>>>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> # Name the components on this agent
>>>>>>>>>>> >>>> a1.sources = r1
>>>>>>>>>>> >>>> a1.sinks = k1
>>>>>>>>>>> >>>> a1.channels = c1
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> # Describe/configure the source
>>>>>>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> # Describe the sink
>>>>>>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>>>>>>> >>>> a1.channels.c1.type = memory
>>>>>>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> # Bind the source and sink to the channel
>>>>>>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>>>>>>> >>>> =============================================
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf
>>>>>>>>>>> conf
>>>>>>>>>>> >>>> --conf-file example.conf --name a1
>>>>>>>>>>> -Dflume.root.logger=INFO,console
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following
>>>>>>>>>>> configuration was done:
>>>>>>>>>>> >>>> Property           Value
>>>>>>>>>>> >>>> Sink Type         logger
>>>>>>>>>>> >>>> Agent Name      a1
>>>>>>>>>>> >>>> Sink Name         k1.
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Event is sent to the flume using:
>>>>>>>>>>> >>>> $ telnet localhost 44444
>>>>>>>>>>> >>>> Trying 127.0.0.1...
>>>>>>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>>>>>>> >>>> Escape character is '^]'.
>>>>>>>>>>> >>>> Hello world! <ENTER>
>>>>>>>>>>> >>>> OK
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> But I could not get any data in the nifi flume processor.
>>>>>>>>>>> Request your
>>>>>>>>>>> >>>> help in this.
>>>>>>>>>>> >>>> Do i need to change the example.conf file of flume so that
>>>>>>>>>>> Nifi Flume
>>>>>>>>>>> >>>> Sink should get the data.
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Thanks and Regards,
>>>>>>>>>>> >>>> Parul
>>>>>>>>>>> >>>
>>>>>>>>>>> >>>
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> --
>>>>>>>>>>> >>> Sent from Gmail Mobile
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

-- 
Sent from Gmail Mobile

Re: Need help in nifi- flume processor

Posted by Parul Agrawal <pa...@gmail.com>.
Hi,

Thank you very much for all the support.
I have written a custom processor to split json to multiple json.
Now I would like to route the flowfile based on the content of the flowfile.
I tried using RouteOnContent. But it did not work.

Can you please help me how can i route the flowfile based on the
content/data it contains.

Thanks and Regards,
Parul



On Tue, Oct 13, 2015 at 6:54 PM, Bryan Bende <bb...@gmail.com> wrote:

> Parul,
>
> You can use SplitJson to take a large JSON document and split an array
> element into individual documents. I took the json you attached and created
> a flow like GetFile -> SplitJson -> SplitJson -> PutFile
>
> In the first SplitJson the path I used was $.packet.proto and in the
> second I used $.field  This seemed to mostly work except some of the splits
> going into PutFile still have another level of "field" which needs to be
> split again so would possibly need some conditional logic to split certain
> documents again.
>
> Alternatively you could write a custom processor that restructures your
> JSON.
>
> -Bryan
>
>
>
> On Tue, Oct 13, 2015 at 8:36 AM, Parul Agrawal <pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I tried with the above json element. But I am getting the below mentioned
>> error:
>>
>> 2015-10-12 23:53:39,209 ERROR [Timer-Driven Process Thread-9]
>> o.a.n.p.standard.ConvertJSONToSQL
>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to parse
>> StandardFlowFileRecord[uuid=dfc16db0-c7a6-4e9e-8b4d-8c5b4ec50742,claim=StandardContentClaim
>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
>> section=1], offset=132621, length=55],offset=0,name=json,size=55] as JSON
>> due to org.apache.nifi.processor.exception.ProcessException: IOException
>> thrown from ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e]:
>> org.codehaus.jackson.JsonParseException: Unexpected character ('I' (code
>> 73)): expected a valid value (number, String, array, object, 'true',
>> 'false' or 'null')
>>
>> Also I have a huge json object attached (new.json). Can you guide me on
>> how do i use ConvertJSONToSQL processor.
>> Should I use any other processor before using ConvertJSONToSQL processor
>> so that this new.json can be converted in to a flat document of
>> key/value pairs, or an array of flat documents.
>>
>> Any help/guidance would be really useful.
>>
>> Thanks and Regards,
>> Parul
>>
>> On Mon, Oct 12, 2015 at 10:36 PM, Bryan Bende <bb...@gmail.com> wrote:
>>
>>> I think ConvertJSONToSQL expects a flat document of key/value pairs, or
>>> an array of flat documents. So I think your JSON would be:
>>>
>>> [
>>>     {"firstname":"John", "lastname":"Doe"},
>>>     {"firstname":"Anna", "lastname":"Smith"}
>>> ]
>>>
>>> The table name will come from the Table Name property.
>>>
>>> Let us know if this doesn't work.
>>>
>>> -Bryan
>>>
>>>
>>> On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal <
>>> parulagrawal14@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Thank you very much for all the support.
>>>> I could able to convert XML format to json  using custom flume source.
>>>>
>>>> Now I would need ConvertJSONToSQL processor to insert data into SQL.
>>>> I am trying to get hands-on on this processor. Will update you on this.
>>>> Meanwhile if any example you could share to use this processor for a
>>>> sample
>>>> json data, then it would be great.
>>>>
>>>> ===============
>>>>
>>>> 1) I tried using ConvertJSONToSQL processor with the below sample json
>>>> file:
>>>>
>>>> "details":[
>>>>     {"firstname":"John", "lastname":"Doe"},
>>>>     {"firstname":"Anna", "lastname":"Smith"}
>>>> ]
>>>>
>>>> 2) I created table *details *in the postgreSQL
>>>> * select * from details ;*
>>>> * firstname | lastname*
>>>> *-----------+----------*
>>>> *(0 rows)*
>>>>
>>>> 3) ConvertJSONToSQL Processor property details are as below:
>>>> *Property  *                                               *Value*
>>>> JDBC Connection PoolInfo            DBCPConnectionPool
>>>> Statement TypeInfo                      INSERT
>>>> Table NameInfo                            details
>>>> Catalog NameInfo                         No value set
>>>> Translate Field NamesInfo             false
>>>> Unmatched Field BehaviorInfo       Ignore Unmatched Fields
>>>> Update KeysInfo                           No value set
>>>>
>>>> But I am getting the below mentioned error in ConvertJSONToSQL
>>>> Processor.
>>>> 2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1]
>>>> o.a.n.p.standard.ConvertJSONToSQL
>>>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert
>>>> StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim
>>>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
>>>> section=1], offset=115045, length=104],offset=0,name=json,size=104] to a
>>>> SQL INSERT statement due to
>>>> org.apache.nifi.processor.exception.ProcessException: None of the fields in
>>>> the JSON map to the columns defined by the details table; routing to
>>>> failure: org.apache.nifi.processor.exception.ProcessException: None of the
>>>> fields in the JSON map to the columns defined by the details table
>>>>
>>>> Thanks and Regards,
>>>> Parul
>>>>
>>>> On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> I've done something like this by wrapping the command in a shell
>>>>> script:
>>>>>
>>>>> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
>>>>>
>>>>> My use case was slightly different, but I'm pretty sure you can adapt
>>>>> the same idea.
>>>>>
>>>>> -Joey
>>>>>
>>>>> On Oct 10, 2015, at 03:52, Parul Agrawal <pa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I actually need to get the data from pipe.
>>>>> So the actual command I would need is mkfifo /tmp/packet;tshark -i
>>>>> ens160 -T pdml >/tmp/packet.
>>>>> Is it possible to use ExecuteProcessor for multiple commands ?
>>>>>
>>>>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <
>>>>> parulagrawal14@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I added custom flume source and when flume source is sending the data
>>>>>> to flume sink, below mentioned error is thrown at flume sink.
>>>>>>
>>>>>>  Administratively Yielded for 1 sec due to processing failure
>>>>>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
>>>>>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
>>>>>> Exception: java.lang.IllegalStateException: close() called when transaction
>>>>>> is OPEN - you must either commit or rollback first
>>>>>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
>>>>>> o.a.n.c.t.ContinuallyRunProcessorTask
>>>>>> java.lang.IllegalStateException: close() called when transaction is
>>>>>> OPEN - you must either commit or rollback first
>>>>>>         at
>>>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>>>>>> ~[guava-r05.jar:na]
>>>>>>         at
>>>>>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>>>         at
>>>>>> org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
>>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>>>         at
>>>>>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>>>>>> ~[na:na]
>>>>>>         at
>>>>>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>>>>>> ~[na:na]
>>>>>>         at
>>>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>>>>>> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>         at
>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>         at
>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>         at
>>>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>>         at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>>> [na:1.7.0_85]
>>>>>>         at
>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>>>> [na:1.7.0_85]
>>>>>>         at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>>>> [na:1.7.0_85]
>>>>>>         at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>> [na:1.7.0_85]
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>> [na:1.7.0_85]
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>> [na:1.7.0_85]
>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>>>>>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
>>>>>> o.a.n.processors.flume.ExecuteFlumeSink
>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
>>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
>>>>>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>>>> section=7], offset=180436,
>>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>>>> this session (StandardProcessSession[id=218318]); rolling back session:
>>>>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>>>> section=7], offset=180436,
>>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>>>> this session (StandardProcessSession[id=218318])
>>>>>>
>>>>>> Any idea what could be wrong in this.
>>>>>>
>>>>>> Thanks and Regards,
>>>>>> Parul
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bb...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Parul,
>>>>>>>
>>>>>>> I think it would be good to keep the convo going on the users list
>>>>>>> since there are more people who can offer help there, and also helps
>>>>>>> everyone learn new solutions.
>>>>>>>
>>>>>>> The quick answer though is that NiFi has an ExecuteProcess processor
>>>>>>> which could execute "tshark -i eth0 -T pdml".
>>>>>>>
>>>>>>> There is not currently an XmlToJson processor, so this could be a
>>>>>>> place where you need a custom processor. For simple cases you can use an
>>>>>>> EvaluateXPath processor to extract values from the XML, and then a
>>>>>>> ReplaceText processor to build a new json document from those extracted
>>>>>>> values.
>>>>>>>
>>>>>>> -Bryan
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <
>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Little more to add.....
>>>>>>>>  I need to keep reading the flowfile till END_TAG is received. i.e.
>>>>>>>> we may need to concatenate the flowfile data till END_TAG.
>>>>>>>> and then convert it to json and call PutFile() processor.
>>>>>>>>
>>>>>>>> Thanks and Regards,
>>>>>>>> Parul
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <
>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Thank you very much again for the guidance provided.
>>>>>>>>> Basically I would need a processor which would convert XML file to
>>>>>>>>> Json.
>>>>>>>>>
>>>>>>>>> Currently I have a flume source which is of type "exec" and the
>>>>>>>>> command used is "tshark -i eth0 -T pdml".
>>>>>>>>>
>>>>>>>>> Here Flume source keeps sending data to flume sink. This flow file
>>>>>>>>> would be of PDML format.
>>>>>>>>>
>>>>>>>>> Now I need a processor which would do the following
>>>>>>>>>
>>>>>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>>>>>> and END TAG (</packet>)
>>>>>>>>> 2) Once the XML message is formed convert it to json.
>>>>>>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>>>>>>
>>>>>>>>> I am not sure if I could able to explain the processor
>>>>>>>>> requirement.
>>>>>>>>> Would be really great if you could help me in this.
>>>>>>>>>
>>>>>>>>> Thanks and Regards,
>>>>>>>>> Parul
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <joey42@gmail.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>>>>
>>>>>>>>>> +1. Running Flume sources/sinks is meant as a transition step.
>>>>>>>>>> It's
>>>>>>>>>> really useful if you have a complex Flume flow and want to migrate
>>>>>>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>>>>>>> sources and sinks to NiFi once you knew that it would meet your
>>>>>>>>>> needs
>>>>>>>>>> well. NiFi has a lot of documentation on writing processors and
>>>>>>>>>> the
>>>>>>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>>>>>>> execution model.
>>>>>>>>>>
>>>>>>>>>> -Joey
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> >
>>>>>>>>>> > Hi Parul,
>>>>>>>>>> >
>>>>>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi,
>>>>>>>>>> but due to the way the Flume processors load the classes for the sources
>>>>>>>>>> and sinks, the jar you deploy to the lib directory also needs to include
>>>>>>>>>> the other dependencies your source/sink needs (or they each need to
>>>>>>>>>> individually be in lib/ directly).
>>>>>>>>>> >
>>>>>>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>>>>>>> > https://github.com/bbende/my-flume-source
>>>>>>>>>> >
>>>>>>>>>> > It will contain the custom source and following dependencies
>>>>>>>>>> all in one jar:
>>>>>>>>>> >
>>>>>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>>>>>> >
>>>>>>>>>> > I copied that to NiFi lib, restarted, created an
>>>>>>>>>> ExecuteFlumeSource processor with the following config:
>>>>>>>>>> >
>>>>>>>>>> > Source Type = org.apache.flume.MySource
>>>>>>>>>> > Agent Name = a1
>>>>>>>>>> > Source Name = r1
>>>>>>>>>> > Flume Configuration = a1.sources = r1
>>>>>>>>>> >
>>>>>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>>>>>> >
>>>>>>>>>> > Keep in mind that this could become risky because any classes
>>>>>>>>>> found in the lib directory would be accessible to all NARs in NiFi and
>>>>>>>>>> would be found before classes within a NAR because the parent is checked
>>>>>>>>>> first during class loading. This example isn't too risky because we are
>>>>>>>>>> only bringing in flume jars and one guava jar, but for example if another
>>>>>>>>>> nar uses a different version of guava this is going to cause a problem.
>>>>>>>>>> >
>>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>>>> >
>>>>>>>>>> > -Bryan
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>>> >>
>>>>>>>>>> >> Hello Bryan,
>>>>>>>>>> >>
>>>>>>>>>> >> Thank you very much for your response.
>>>>>>>>>> >>
>>>>>>>>>> >> Is it possible to have customized flume source and sink in
>>>>>>>>>> Nifi?
>>>>>>>>>> >> I have my own customized source and sink? I followed below
>>>>>>>>>> steps to add my own customized source but it did not work.
>>>>>>>>>> >>
>>>>>>>>>> >> 1) Created Maven project and added customized source.
>>>>>>>>>> (flume.jar was created after this step)
>>>>>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>>>>>> >>
>>>>>>>>>> >> Property           Value
>>>>>>>>>> >> Source Type         com.flume.source.Source
>>>>>>>>>> >> Agent Name      a1
>>>>>>>>>> >> Source Name         k1.
>>>>>>>>>> >>
>>>>>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>>>>>> >> "Failed to run validation due to
>>>>>>>>>> java.lang.NoClassDefFoundError : /org/apache/flume/PollableSource."
>>>>>>>>>> >>
>>>>>>>>>> >> Can you please help me in this regard. Any step/configuration
>>>>>>>>>> I missed.
>>>>>>>>>> >>
>>>>>>>>>> >> Thanks and Regards,
>>>>>>>>>> >> Parul
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> >>>
>>>>>>>>>> >>> Hello,
>>>>>>>>>> >>>
>>>>>>>>>> >>> The NiFi Flume processors are for running Flume sources and
>>>>>>>>>> sinks with in NiFi. They don't communicate with an external Flume process.
>>>>>>>>>> >>>
>>>>>>>>>> >>> In your example you would need an ExecuteFlumeSource
>>>>>>>>>> configured to run the netcat source, connected to a ExecuteFlumeSink
>>>>>>>>>> configured with the logger.
>>>>>>>>>> >>>
>>>>>>>>>> >>> -Bryan
>>>>>>>>>> >>>
>>>>>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Hi,
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> I was trying to run Nifi Flume processor with the below
>>>>>>>>>> mentioned
>>>>>>>>>> >>>> details but not could bring it up.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> I already started flume with the sample configuration file
>>>>>>>>>> >>>> =============================================
>>>>>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> # Name the components on this agent
>>>>>>>>>> >>>> a1.sources = r1
>>>>>>>>>> >>>> a1.sinks = k1
>>>>>>>>>> >>>> a1.channels = c1
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> # Describe/configure the source
>>>>>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> # Describe the sink
>>>>>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>>>>>> >>>> a1.channels.c1.type = memory
>>>>>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> # Bind the source and sink to the channel
>>>>>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>>>>>> >>>> =============================================
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf
>>>>>>>>>> conf
>>>>>>>>>> >>>> --conf-file example.conf --name a1
>>>>>>>>>> -Dflume.root.logger=INFO,console
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following
>>>>>>>>>> configuration was done:
>>>>>>>>>> >>>> Property           Value
>>>>>>>>>> >>>> Sink Type         logger
>>>>>>>>>> >>>> Agent Name      a1
>>>>>>>>>> >>>> Sink Name         k1.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Event is sent to the flume using:
>>>>>>>>>> >>>> $ telnet localhost 44444
>>>>>>>>>> >>>> Trying 127.0.0.1...
>>>>>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>>>>>> >>>> Escape character is '^]'.
>>>>>>>>>> >>>> Hello world! <ENTER>
>>>>>>>>>> >>>> OK
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> But I could not get any data in the nifi flume processor.
>>>>>>>>>> Request your
>>>>>>>>>> >>>> help in this.
>>>>>>>>>> >>>> Do i need to change the example.conf file of flume so that
>>>>>>>>>> Nifi Flume
>>>>>>>>>> >>>> Sink should get the data.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Thanks and Regards,
>>>>>>>>>> >>>> Parul
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> --
>>>>>>>>>> >>> Sent from Gmail Mobile
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Need help in nifi- flume processor

Posted by Bryan Bende <bb...@gmail.com>.
Parul,

You can use SplitJson to take a large JSON document and split an array
element into individual documents. I took the json you attached and created
a flow like GetFile -> SplitJson -> SplitJson -> PutFile

In the first SplitJson the path I used was $.packet.proto and in the second
I used $.field  This seemed to mostly work except some of the splits going
into PutFile still have another level of "field" which needs to be split
again so would possibly need some conditional logic to split certain
documents again.

Alternatively you could write a custom processor that restructures your
JSON.

-Bryan



On Tue, Oct 13, 2015 at 8:36 AM, Parul Agrawal <pa...@gmail.com>
wrote:

> Hi,
>
> I tried with the above json element. But I am getting the below mentioned
> error:
>
> 2015-10-12 23:53:39,209 ERROR [Timer-Driven Process Thread-9]
> o.a.n.p.standard.ConvertJSONToSQL
> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to parse
> StandardFlowFileRecord[uuid=dfc16db0-c7a6-4e9e-8b4d-8c5b4ec50742,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
> section=1], offset=132621, length=55],offset=0,name=json,size=55] as JSON
> due to org.apache.nifi.processor.exception.ProcessException: IOException
> thrown from ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e]:
> org.codehaus.jackson.JsonParseException: Unexpected character ('I' (code
> 73)): expected a valid value (number, String, array, object, 'true',
> 'false' or 'null')
>
> Also I have a huge json object attached (new.json). Can you guide me on
> how do i use ConvertJSONToSQL processor.
> Should I use any other processor before using ConvertJSONToSQL processor
> so that this new.json can be converted in to a flat document of key/value
> pairs, or an array of flat documents.
>
> Any help/guidance would be really useful.
>
> Thanks and Regards,
> Parul
>
> On Mon, Oct 12, 2015 at 10:36 PM, Bryan Bende <bb...@gmail.com> wrote:
>
>> I think ConvertJSONToSQL expects a flat document of key/value pairs, or
>> an array of flat documents. So I think your JSON would be:
>>
>> [
>>     {"firstname":"John", "lastname":"Doe"},
>>     {"firstname":"Anna", "lastname":"Smith"}
>> ]
>>
>> The table name will come from the Table Name property.
>>
>> Let us know if this doesn't work.
>>
>> -Bryan
>>
>>
>> On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal <parulagrawal14@gmail.com
>> > wrote:
>>
>>> Hi,
>>>
>>> Thank you very much for all the support.
>>> I could able to convert XML format to json  using custom flume source.
>>>
>>> Now I would need ConvertJSONToSQL processor to insert data into SQL.
>>> I am trying to get hands-on on this processor. Will update you on this.
>>> Meanwhile if any example you could share to use this processor for a
>>> sample
>>> json data, then it would be great.
>>>
>>> ===============
>>>
>>> 1) I tried using ConvertJSONToSQL processor with the below sample json
>>> file:
>>>
>>> "details":[
>>>     {"firstname":"John", "lastname":"Doe"},
>>>     {"firstname":"Anna", "lastname":"Smith"}
>>> ]
>>>
>>> 2) I created table *details *in the postgreSQL
>>> * select * from details ;*
>>> * firstname | lastname*
>>> *-----------+----------*
>>> *(0 rows)*
>>>
>>> 3) ConvertJSONToSQL Processor property details are as below:
>>> *Property  *                                               *Value*
>>> JDBC Connection PoolInfo            DBCPConnectionPool
>>> Statement TypeInfo                      INSERT
>>> Table NameInfo                            details
>>> Catalog NameInfo                         No value set
>>> Translate Field NamesInfo             false
>>> Unmatched Field BehaviorInfo       Ignore Unmatched Fields
>>> Update KeysInfo                           No value set
>>>
>>> But I am getting the below mentioned error in ConvertJSONToSQL Processor.
>>> 2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1]
>>> o.a.n.p.standard.ConvertJSONToSQL
>>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert
>>> StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim
>>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
>>> section=1], offset=115045, length=104],offset=0,name=json,size=104] to a
>>> SQL INSERT statement due to
>>> org.apache.nifi.processor.exception.ProcessException: None of the fields in
>>> the JSON map to the columns defined by the details table; routing to
>>> failure: org.apache.nifi.processor.exception.ProcessException: None of the
>>> fields in the JSON map to the columns defined by the details table
>>>
>>> Thanks and Regards,
>>> Parul
>>>
>>> On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <jo...@gmail.com>
>>> wrote:
>>>
>>>> I've done something like this by wrapping the command in a shell script:
>>>>
>>>> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
>>>>
>>>> My use case was slightly different, but I'm pretty sure you can adapt
>>>> the same idea.
>>>>
>>>> -Joey
>>>>
>>>> On Oct 10, 2015, at 03:52, Parul Agrawal <pa...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I actually need to get the data from pipe.
>>>> So the actual command I would need is mkfifo /tmp/packet;tshark -i
>>>> ens160 -T pdml >/tmp/packet.
>>>> Is it possible to use ExecuteProcessor for multiple commands ?
>>>>
>>>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <
>>>> parulagrawal14@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I added custom flume source and when flume source is sending the data
>>>>> to flume sink, below mentioned error is thrown at flume sink.
>>>>>
>>>>>  Administratively Yielded for 1 sec due to processing failure
>>>>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
>>>>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
>>>>> Exception: java.lang.IllegalStateException: close() called when transaction
>>>>> is OPEN - you must either commit or rollback first
>>>>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
>>>>> o.a.n.c.t.ContinuallyRunProcessorTask
>>>>> java.lang.IllegalStateException: close() called when transaction is
>>>>> OPEN - you must either commit or rollback first
>>>>>         at
>>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>>>>> ~[guava-r05.jar:na]
>>>>>         at
>>>>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>>         at
>>>>> org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
>>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>>         at
>>>>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>>>>> ~[na:na]
>>>>>         at
>>>>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>>>>> ~[na:na]
>>>>>         at
>>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>>>>> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>         at
>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>         at
>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>         at
>>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>>         at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>> [na:1.7.0_85]
>>>>>         at
>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>>> [na:1.7.0_85]
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>>> [na:1.7.0_85]
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> [na:1.7.0_85]
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> [na:1.7.0_85]
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> [na:1.7.0_85]
>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>>>>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
>>>>> o.a.n.processors.flume.ExecuteFlumeSink
>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
>>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
>>>>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>>> section=7], offset=180436,
>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>>> this session (StandardProcessSession[id=218318]); rolling back session:
>>>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>>> section=7], offset=180436,
>>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>>> this session (StandardProcessSession[id=218318])
>>>>>
>>>>> Any idea what could be wrong in this.
>>>>>
>>>>> Thanks and Regards,
>>>>> Parul
>>>>>
>>>>>
>>>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bb...@gmail.com> wrote:
>>>>>
>>>>>> Hi Parul,
>>>>>>
>>>>>> I think it would be good to keep the convo going on the users list
>>>>>> since there are more people who can offer help there, and also helps
>>>>>> everyone learn new solutions.
>>>>>>
>>>>>> The quick answer though is that NiFi has an ExecuteProcess processor
>>>>>> which could execute "tshark -i eth0 -T pdml".
>>>>>>
>>>>>> There is not currently an XmlToJson processor, so this could be a
>>>>>> place where you need a custom processor. For simple cases you can use an
>>>>>> EvaluateXPath processor to extract values from the XML, and then a
>>>>>> ReplaceText processor to build a new json document from those extracted
>>>>>> values.
>>>>>>
>>>>>> -Bryan
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <
>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Little more to add.....
>>>>>>>  I need to keep reading the flowfile till END_TAG is received. i.e.
>>>>>>> we may need to concatenate the flowfile data till END_TAG.
>>>>>>> and then convert it to json and call PutFile() processor.
>>>>>>>
>>>>>>> Thanks and Regards,
>>>>>>> Parul
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <
>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Thank you very much again for the guidance provided.
>>>>>>>> Basically I would need a processor which would convert XML file to
>>>>>>>> Json.
>>>>>>>>
>>>>>>>> Currently I have a flume source which is of type "exec" and the
>>>>>>>> command used is "tshark -i eth0 -T pdml".
>>>>>>>>
>>>>>>>> Here Flume source keeps sending data to flume sink. This flow file
>>>>>>>> would be of PDML format.
>>>>>>>>
>>>>>>>> Now I need a processor which would do the following
>>>>>>>>
>>>>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>>>>> and END TAG (</packet>)
>>>>>>>> 2) Once the XML message is formed convert it to json.
>>>>>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>>>>>
>>>>>>>> I am not sure if I could able to explain the processor requirement.
>>>>>>>> Would be really great if you could help me in this.
>>>>>>>>
>>>>>>>> Thanks and Regards,
>>>>>>>> Parul
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <jo...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>>>
>>>>>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>>>>>>> really useful if you have a complex Flume flow and want to migrate
>>>>>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>>>>>> sources and sinks to NiFi once you knew that it would meet your
>>>>>>>>> needs
>>>>>>>>> well. NiFi has a lot of documentation on writing processors and the
>>>>>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>>>>>> execution model.
>>>>>>>>>
>>>>>>>>> -Joey
>>>>>>>>>
>>>>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>> >
>>>>>>>>> > Hi Parul,
>>>>>>>>> >
>>>>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but
>>>>>>>>> due to the way the Flume processors load the classes for the sources and
>>>>>>>>> sinks, the jar you deploy to the lib directory also needs to include the
>>>>>>>>> other dependencies your source/sink needs (or they each need to
>>>>>>>>> individually be in lib/ directly).
>>>>>>>>> >
>>>>>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>>>>>> > https://github.com/bbende/my-flume-source
>>>>>>>>> >
>>>>>>>>> > It will contain the custom source and following dependencies all
>>>>>>>>> in one jar:
>>>>>>>>> >
>>>>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>>>>> >
>>>>>>>>> > I copied that to NiFi lib, restarted, created an
>>>>>>>>> ExecuteFlumeSource processor with the following config:
>>>>>>>>> >
>>>>>>>>> > Source Type = org.apache.flume.MySource
>>>>>>>>> > Agent Name = a1
>>>>>>>>> > Source Name = r1
>>>>>>>>> > Flume Configuration = a1.sources = r1
>>>>>>>>> >
>>>>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>>>>> >
>>>>>>>>> > Keep in mind that this could become risky because any classes
>>>>>>>>> found in the lib directory would be accessible to all NARs in NiFi and
>>>>>>>>> would be found before classes within a NAR because the parent is checked
>>>>>>>>> first during class loading. This example isn't too risky because we are
>>>>>>>>> only bringing in flume jars and one guava jar, but for example if another
>>>>>>>>> nar uses a different version of guava this is going to cause a problem.
>>>>>>>>> >
>>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>>> >
>>>>>>>>> > -Bryan
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>> >>
>>>>>>>>> >> Hello Bryan,
>>>>>>>>> >>
>>>>>>>>> >> Thank you very much for your response.
>>>>>>>>> >>
>>>>>>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>>>>>>> >> I have my own customized source and sink? I followed below
>>>>>>>>> steps to add my own customized source but it did not work.
>>>>>>>>> >>
>>>>>>>>> >> 1) Created Maven project and added customized source.
>>>>>>>>> (flume.jar was created after this step)
>>>>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>>>>> >>
>>>>>>>>> >> Property           Value
>>>>>>>>> >> Source Type         com.flume.source.Source
>>>>>>>>> >> Agent Name      a1
>>>>>>>>> >> Source Name         k1.
>>>>>>>>> >>
>>>>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError
>>>>>>>>> : /org/apache/flume/PollableSource."
>>>>>>>>> >>
>>>>>>>>> >> Can you please help me in this regard. Any step/configuration I
>>>>>>>>> missed.
>>>>>>>>> >>
>>>>>>>>> >> Thanks and Regards,
>>>>>>>>> >> Parul
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>> Hello,
>>>>>>>>> >>>
>>>>>>>>> >>> The NiFi Flume processors are for running Flume sources and
>>>>>>>>> sinks with in NiFi. They don't communicate with an external Flume process.
>>>>>>>>> >>>
>>>>>>>>> >>> In your example you would need an ExecuteFlumeSource
>>>>>>>>> configured to run the netcat source, connected to a ExecuteFlumeSink
>>>>>>>>> configured with the logger.
>>>>>>>>> >>>
>>>>>>>>> >>> -Bryan
>>>>>>>>> >>>
>>>>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>>> >>>>
>>>>>>>>> >>>> Hi,
>>>>>>>>> >>>>
>>>>>>>>> >>>> I was trying to run Nifi Flume processor with the below
>>>>>>>>> mentioned
>>>>>>>>> >>>> details but not could bring it up.
>>>>>>>>> >>>>
>>>>>>>>> >>>> I already started flume with the sample configuration file
>>>>>>>>> >>>> =============================================
>>>>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>>>>> >>>>
>>>>>>>>> >>>> # Name the components on this agent
>>>>>>>>> >>>> a1.sources = r1
>>>>>>>>> >>>> a1.sinks = k1
>>>>>>>>> >>>> a1.channels = c1
>>>>>>>>> >>>>
>>>>>>>>> >>>> # Describe/configure the source
>>>>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>>>>> >>>>
>>>>>>>>> >>>> # Describe the sink
>>>>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>>>>> >>>>
>>>>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>>>>> >>>> a1.channels.c1.type = memory
>>>>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>>>>> >>>>
>>>>>>>>> >>>> # Bind the source and sink to the channel
>>>>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>>>>> >>>> =============================================
>>>>>>>>> >>>>
>>>>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>>>>>>> >>>> --conf-file example.conf --name a1
>>>>>>>>> -Dflume.root.logger=INFO,console
>>>>>>>>> >>>>
>>>>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following
>>>>>>>>> configuration was done:
>>>>>>>>> >>>> Property           Value
>>>>>>>>> >>>> Sink Type         logger
>>>>>>>>> >>>> Agent Name      a1
>>>>>>>>> >>>> Sink Name         k1.
>>>>>>>>> >>>>
>>>>>>>>> >>>> Event is sent to the flume using:
>>>>>>>>> >>>> $ telnet localhost 44444
>>>>>>>>> >>>> Trying 127.0.0.1...
>>>>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>>>>> >>>> Escape character is '^]'.
>>>>>>>>> >>>> Hello world! <ENTER>
>>>>>>>>> >>>> OK
>>>>>>>>> >>>>
>>>>>>>>> >>>> But I could not get any data in the nifi flume processor.
>>>>>>>>> Request your
>>>>>>>>> >>>> help in this.
>>>>>>>>> >>>> Do i need to change the example.conf file of flume so that
>>>>>>>>> Nifi Flume
>>>>>>>>> >>>> Sink should get the data.
>>>>>>>>> >>>>
>>>>>>>>> >>>> Thanks and Regards,
>>>>>>>>> >>>> Parul
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>> --
>>>>>>>>> >>> Sent from Gmail Mobile
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Need help in nifi- flume processor

Posted by Parul Agrawal <pa...@gmail.com>.
Hi,

I tried with the above json element. But I am getting the below mentioned
error:

2015-10-12 23:53:39,209 ERROR [Timer-Driven Process Thread-9]
o.a.n.p.standard.ConvertJSONToSQL
ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to parse
StandardFlowFileRecord[uuid=dfc16db0-c7a6-4e9e-8b4d-8c5b4ec50742,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
section=1], offset=132621, length=55],offset=0,name=json,size=55] as JSON
due to org.apache.nifi.processor.exception.ProcessException: IOException
thrown from ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e]:
org.codehaus.jackson.JsonParseException: Unexpected character ('I' (code
73)): expected a valid value (number, String, array, object, 'true',
'false' or 'null')

Also I have a huge json object attached (new.json). Can you guide me on how
do i use ConvertJSONToSQL processor.
Should I use any other processor before using ConvertJSONToSQL processor so
that this new.json can be converted in to a flat document of key/value
pairs, or an array of flat documents.

Any help/guidance would be really useful.

Thanks and Regards,
Parul

On Mon, Oct 12, 2015 at 10:36 PM, Bryan Bende <bb...@gmail.com> wrote:

> I think ConvertJSONToSQL expects a flat document of key/value pairs, or an
> array of flat documents. So I think your JSON would be:
>
> [
>     {"firstname":"John", "lastname":"Doe"},
>     {"firstname":"Anna", "lastname":"Smith"}
> ]
>
> The table name will come from the Table Name property.
>
> Let us know if this doesn't work.
>
> -Bryan
>
>
> On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal <pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Thank you very much for all the support.
>> I could able to convert XML format to json  using custom flume source.
>>
>> Now I would need ConvertJSONToSQL processor to insert data into SQL.
>> I am trying to get hands-on on this processor. Will update you on this.
>> Meanwhile if any example you could share to use this processor for a
>> sample
>> json data, then it would be great.
>>
>> ===============
>>
>> 1) I tried using ConvertJSONToSQL processor with the below sample json
>> file:
>>
>> "details":[
>>     {"firstname":"John", "lastname":"Doe"},
>>     {"firstname":"Anna", "lastname":"Smith"}
>> ]
>>
>> 2) I created table *details *in the postgreSQL
>> * select * from details ;*
>> * firstname | lastname*
>> *-----------+----------*
>> *(0 rows)*
>>
>> 3) ConvertJSONToSQL Processor property details are as below:
>> *Property  *                                               *Value*
>> JDBC Connection PoolInfo            DBCPConnectionPool
>> Statement TypeInfo                      INSERT
>> Table NameInfo                            details
>> Catalog NameInfo                         No value set
>> Translate Field NamesInfo             false
>> Unmatched Field BehaviorInfo       Ignore Unmatched Fields
>> Update KeysInfo                           No value set
>>
>> But I am getting the below mentioned error in ConvertJSONToSQL Processor.
>> 2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1]
>> o.a.n.p.standard.ConvertJSONToSQL
>> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert
>> StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim
>> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
>> section=1], offset=115045, length=104],offset=0,name=json,size=104] to a
>> SQL INSERT statement due to
>> org.apache.nifi.processor.exception.ProcessException: None of the fields in
>> the JSON map to the columns defined by the details table; routing to
>> failure: org.apache.nifi.processor.exception.ProcessException: None of the
>> fields in the JSON map to the columns defined by the details table
>>
>> Thanks and Regards,
>> Parul
>>
>> On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <jo...@gmail.com>
>> wrote:
>>
>>> I've done something like this by wrapping the command in a shell script:
>>>
>>> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
>>>
>>> My use case was slightly different, but I'm pretty sure you can adapt
>>> the same idea.
>>>
>>> -Joey
>>>
>>> On Oct 10, 2015, at 03:52, Parul Agrawal <pa...@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I actually need to get the data from pipe.
>>> So the actual command I would need is mkfifo /tmp/packet;tshark -i
>>> ens160 -T pdml >/tmp/packet.
>>> Is it possible to use ExecuteProcessor for multiple commands ?
>>>
>>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <parulagrawal14@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> I added custom flume source and when flume source is sending the data
>>>> to flume sink, below mentioned error is thrown at flume sink.
>>>>
>>>>  Administratively Yielded for 1 sec due to processing failure
>>>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
>>>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
>>>> Exception: java.lang.IllegalStateException: close() called when transaction
>>>> is OPEN - you must either commit or rollback first
>>>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
>>>> o.a.n.c.t.ContinuallyRunProcessorTask
>>>> java.lang.IllegalStateException: close() called when transaction is
>>>> OPEN - you must either commit or rollback first
>>>>         at
>>>> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>>>> ~[guava-r05.jar:na]
>>>>         at
>>>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>         at
>>>> org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
>>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>>         at
>>>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>>>> ~[na:na]
>>>>         at
>>>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>>>> ~[na:na]
>>>>         at
>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>>>> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>>>         at
>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>         at
>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>         at
>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>>         at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>> [na:1.7.0_85]
>>>>         at
>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>> [na:1.7.0_85]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>> [na:1.7.0_85]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> [na:1.7.0_85]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> [na:1.7.0_85]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> [na:1.7.0_85]
>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>>>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
>>>> o.a.n.processors.flume.ExecuteFlumeSink
>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
>>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
>>>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>> section=7], offset=180436,
>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>> this session (StandardProcessSession[id=218318]); rolling back session:
>>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>>> section=7], offset=180436,
>>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>>> this session (StandardProcessSession[id=218318])
>>>>
>>>> Any idea what could be wrong in this.
>>>>
>>>> Thanks and Regards,
>>>> Parul
>>>>
>>>>
>>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bb...@gmail.com> wrote:
>>>>
>>>>> Hi Parul,
>>>>>
>>>>> I think it would be good to keep the convo going on the users list
>>>>> since there are more people who can offer help there, and also helps
>>>>> everyone learn new solutions.
>>>>>
>>>>> The quick answer though is that NiFi has an ExecuteProcess processor
>>>>> which could execute "tshark -i eth0 -T pdml".
>>>>>
>>>>> There is not currently an XmlToJson processor, so this could be a
>>>>> place where you need a custom processor. For simple cases you can use an
>>>>> EvaluateXPath processor to extract values from the XML, and then a
>>>>> ReplaceText processor to build a new json document from those extracted
>>>>> values.
>>>>>
>>>>> -Bryan
>>>>>
>>>>>
>>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <
>>>>> parulagrawal14@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Little more to add.....
>>>>>>  I need to keep reading the flowfile till END_TAG is received. i.e.
>>>>>> we may need to concatenate the flowfile data till END_TAG.
>>>>>> and then convert it to json and call PutFile() processor.
>>>>>>
>>>>>> Thanks and Regards,
>>>>>> Parul
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <
>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Thank you very much again for the guidance provided.
>>>>>>> Basically I would need a processor which would convert XML file to
>>>>>>> Json.
>>>>>>>
>>>>>>> Currently I have a flume source which is of type "exec" and the
>>>>>>> command used is "tshark -i eth0 -T pdml".
>>>>>>>
>>>>>>> Here Flume source keeps sending data to flume sink. This flow file
>>>>>>> would be of PDML format.
>>>>>>>
>>>>>>> Now I need a processor which would do the following
>>>>>>>
>>>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>>>> and END TAG (</packet>)
>>>>>>> 2) Once the XML message is formed convert it to json.
>>>>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>>>>
>>>>>>> I am not sure if I could able to explain the processor requirement.
>>>>>>> Would be really great if you could help me in this.
>>>>>>>
>>>>>>> Thanks and Regards,
>>>>>>> Parul
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <jo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>>
>>>>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>>>>>> really useful if you have a complex Flume flow and want to migrate
>>>>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>>>>> sources and sinks to NiFi once you knew that it would meet your
>>>>>>>> needs
>>>>>>>> well. NiFi has a lot of documentation on writing processors and the
>>>>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>>>>> execution model.
>>>>>>>>
>>>>>>>> -Joey
>>>>>>>>
>>>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > Hi Parul,
>>>>>>>> >
>>>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but
>>>>>>>> due to the way the Flume processors load the classes for the sources and
>>>>>>>> sinks, the jar you deploy to the lib directory also needs to include the
>>>>>>>> other dependencies your source/sink needs (or they each need to
>>>>>>>> individually be in lib/ directly).
>>>>>>>> >
>>>>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>>>>> > https://github.com/bbende/my-flume-source
>>>>>>>> >
>>>>>>>> > It will contain the custom source and following dependencies all
>>>>>>>> in one jar:
>>>>>>>> >
>>>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>>>> >
>>>>>>>> > I copied that to NiFi lib, restarted, created an
>>>>>>>> ExecuteFlumeSource processor with the following config:
>>>>>>>> >
>>>>>>>> > Source Type = org.apache.flume.MySource
>>>>>>>> > Agent Name = a1
>>>>>>>> > Source Name = r1
>>>>>>>> > Flume Configuration = a1.sources = r1
>>>>>>>> >
>>>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>>>> >
>>>>>>>> > Keep in mind that this could become risky because any classes
>>>>>>>> found in the lib directory would be accessible to all NARs in NiFi and
>>>>>>>> would be found before classes within a NAR because the parent is checked
>>>>>>>> first during class loading. This example isn't too risky because we are
>>>>>>>> only bringing in flume jars and one guava jar, but for example if another
>>>>>>>> nar uses a different version of guava this is going to cause a problem.
>>>>>>>> >
>>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>> >
>>>>>>>> > -Bryan
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>> >>
>>>>>>>> >> Hello Bryan,
>>>>>>>> >>
>>>>>>>> >> Thank you very much for your response.
>>>>>>>> >>
>>>>>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>>>>>> >> I have my own customized source and sink? I followed below steps
>>>>>>>> to add my own customized source but it did not work.
>>>>>>>> >>
>>>>>>>> >> 1) Created Maven project and added customized source. (flume.jar
>>>>>>>> was created after this step)
>>>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>>>> >>
>>>>>>>> >> Property           Value
>>>>>>>> >> Source Type         com.flume.source.Source
>>>>>>>> >> Agent Name      a1
>>>>>>>> >> Source Name         k1.
>>>>>>>> >>
>>>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError
>>>>>>>> : /org/apache/flume/PollableSource."
>>>>>>>> >>
>>>>>>>> >> Can you please help me in this regard. Any step/configuration I
>>>>>>>> missed.
>>>>>>>> >>
>>>>>>>> >> Thanks and Regards,
>>>>>>>> >> Parul
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Hello,
>>>>>>>> >>>
>>>>>>>> >>> The NiFi Flume processors are for running Flume sources and
>>>>>>>> sinks with in NiFi. They don't communicate with an external Flume process.
>>>>>>>> >>>
>>>>>>>> >>> In your example you would need an ExecuteFlumeSource configured
>>>>>>>> to run the netcat source, connected to a ExecuteFlumeSink configured with
>>>>>>>> the logger.
>>>>>>>> >>>
>>>>>>>> >>> -Bryan
>>>>>>>> >>>
>>>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>> Hi,
>>>>>>>> >>>>
>>>>>>>> >>>> I was trying to run Nifi Flume processor with the below
>>>>>>>> mentioned
>>>>>>>> >>>> details but not could bring it up.
>>>>>>>> >>>>
>>>>>>>> >>>> I already started flume with the sample configuration file
>>>>>>>> >>>> =============================================
>>>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>>>> >>>>
>>>>>>>> >>>> # Name the components on this agent
>>>>>>>> >>>> a1.sources = r1
>>>>>>>> >>>> a1.sinks = k1
>>>>>>>> >>>> a1.channels = c1
>>>>>>>> >>>>
>>>>>>>> >>>> # Describe/configure the source
>>>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>>>> >>>>
>>>>>>>> >>>> # Describe the sink
>>>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>>>> >>>>
>>>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>>>> >>>> a1.channels.c1.type = memory
>>>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>>>> >>>>
>>>>>>>> >>>> # Bind the source and sink to the channel
>>>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>>>> >>>> =============================================
>>>>>>>> >>>>
>>>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>>>>>> >>>> --conf-file example.conf --name a1
>>>>>>>> -Dflume.root.logger=INFO,console
>>>>>>>> >>>>
>>>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following
>>>>>>>> configuration was done:
>>>>>>>> >>>> Property           Value
>>>>>>>> >>>> Sink Type         logger
>>>>>>>> >>>> Agent Name      a1
>>>>>>>> >>>> Sink Name         k1.
>>>>>>>> >>>>
>>>>>>>> >>>> Event is sent to the flume using:
>>>>>>>> >>>> $ telnet localhost 44444
>>>>>>>> >>>> Trying 127.0.0.1...
>>>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>>>> >>>> Escape character is '^]'.
>>>>>>>> >>>> Hello world! <ENTER>
>>>>>>>> >>>> OK
>>>>>>>> >>>>
>>>>>>>> >>>> But I could not get any data in the nifi flume processor.
>>>>>>>> Request your
>>>>>>>> >>>> help in this.
>>>>>>>> >>>> Do i need to change the example.conf file of flume so that
>>>>>>>> Nifi Flume
>>>>>>>> >>>> Sink should get the data.
>>>>>>>> >>>>
>>>>>>>> >>>> Thanks and Regards,
>>>>>>>> >>>> Parul
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> --
>>>>>>>> >>> Sent from Gmail Mobile
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Need help in nifi- flume processor

Posted by Bryan Bende <bb...@gmail.com>.
I think ConvertJSONToSQL expects a flat document of key/value pairs, or an
array of flat documents. So I think your JSON would be:

[
    {"firstname":"John", "lastname":"Doe"},
    {"firstname":"Anna", "lastname":"Smith"}
]

The table name will come from the Table Name property.

Let us know if this doesn't work.

-Bryan


On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal <pa...@gmail.com>
wrote:

> Hi,
>
> Thank you very much for all the support.
> I could able to convert XML format to json  using custom flume source.
>
> Now I would need ConvertJSONToSQL processor to insert data into SQL.
> I am trying to get hands-on on this processor. Will update you on this.
> Meanwhile if any example you could share to use this processor for a sample
> json data, then it would be great.
>
> ===============
>
> 1) I tried using ConvertJSONToSQL processor with the below sample json
> file:
>
> "details":[
>     {"firstname":"John", "lastname":"Doe"},
>     {"firstname":"Anna", "lastname":"Smith"}
> ]
>
> 2) I created table *details *in the postgreSQL
> * select * from details ;*
> * firstname | lastname*
> *-----------+----------*
> *(0 rows)*
>
> 3) ConvertJSONToSQL Processor property details are as below:
> *Property  *                                               *Value*
> JDBC Connection PoolInfo            DBCPConnectionPool
> Statement TypeInfo                      INSERT
> Table NameInfo                            details
> Catalog NameInfo                         No value set
> Translate Field NamesInfo             false
> Unmatched Field BehaviorInfo       Ignore Unmatched Fields
> Update KeysInfo                           No value set
>
> But I am getting the below mentioned error in ConvertJSONToSQL Processor.
> 2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1]
> o.a.n.p.standard.ConvertJSONToSQL
> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert
> StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
> section=1], offset=115045, length=104],offset=0,name=json,size=104] to a
> SQL INSERT statement due to
> org.apache.nifi.processor.exception.ProcessException: None of the fields in
> the JSON map to the columns defined by the details table; routing to
> failure: org.apache.nifi.processor.exception.ProcessException: None of the
> fields in the JSON map to the columns defined by the details table
>
> Thanks and Regards,
> Parul
>
> On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <jo...@gmail.com> wrote:
>
>> I've done something like this by wrapping the command in a shell script:
>>
>> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
>>
>> My use case was slightly different, but I'm pretty sure you can adapt the
>> same idea.
>>
>> -Joey
>>
>> On Oct 10, 2015, at 03:52, Parul Agrawal <pa...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I actually need to get the data from pipe.
>> So the actual command I would need is mkfifo /tmp/packet;tshark -i ens160
>> -T pdml >/tmp/packet.
>> Is it possible to use ExecuteProcessor for multiple commands ?
>>
>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <pa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I added custom flume source and when flume source is sending the data to
>>> flume sink, below mentioned error is thrown at flume sink.
>>>
>>>  Administratively Yielded for 1 sec due to processing failure
>>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
>>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
>>> Exception: java.lang.IllegalStateException: close() called when transaction
>>> is OPEN - you must either commit or rollback first
>>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
>>> o.a.n.c.t.ContinuallyRunProcessorTask
>>> java.lang.IllegalStateException: close() called when transaction is OPEN
>>> - you must either commit or rollback first
>>>         at
>>> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>>> ~[guava-r05.jar:na]
>>>         at
>>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>         at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
>>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>>         at
>>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>>> ~[na:na]
>>>         at
>>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>>> ~[na:na]
>>>         at
>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>>> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>>         at
>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>         at
>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>         at
>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> [na:1.7.0_85]
>>>         at
>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>> [na:1.7.0_85]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>> [na:1.7.0_85]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> [na:1.7.0_85]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> [na:1.7.0_85]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> [na:1.7.0_85]
>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
>>> o.a.n.processors.flume.ExecuteFlumeSink
>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
>>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>> section=7], offset=180436,
>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>> this session (StandardProcessSession[id=218318]); rolling back session:
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>>> section=7], offset=180436,
>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>>> this session (StandardProcessSession[id=218318])
>>>
>>> Any idea what could be wrong in this.
>>>
>>> Thanks and Regards,
>>> Parul
>>>
>>>
>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bb...@gmail.com> wrote:
>>>
>>>> Hi Parul,
>>>>
>>>> I think it would be good to keep the convo going on the users list
>>>> since there are more people who can offer help there, and also helps
>>>> everyone learn new solutions.
>>>>
>>>> The quick answer though is that NiFi has an ExecuteProcess processor
>>>> which could execute "tshark -i eth0 -T pdml".
>>>>
>>>> There is not currently an XmlToJson processor, so this could be a place
>>>> where you need a custom processor. For simple cases you can use an
>>>> EvaluateXPath processor to extract values from the XML, and then a
>>>> ReplaceText processor to build a new json document from those extracted
>>>> values.
>>>>
>>>> -Bryan
>>>>
>>>>
>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <parulagrawal14@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Little more to add.....
>>>>>  I need to keep reading the flowfile till END_TAG is received. i.e. we
>>>>> may need to concatenate the flowfile data till END_TAG.
>>>>> and then convert it to json and call PutFile() processor.
>>>>>
>>>>> Thanks and Regards,
>>>>> Parul
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <
>>>>> parulagrawal14@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thank you very much again for the guidance provided.
>>>>>> Basically I would need a processor which would convert XML file to
>>>>>> Json.
>>>>>>
>>>>>> Currently I have a flume source which is of type "exec" and the
>>>>>> command used is "tshark -i eth0 -T pdml".
>>>>>>
>>>>>> Here Flume source keeps sending data to flume sink. This flow file
>>>>>> would be of PDML format.
>>>>>>
>>>>>> Now I need a processor which would do the following
>>>>>>
>>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>>> and END TAG (</packet>)
>>>>>> 2) Once the XML message is formed convert it to json.
>>>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>>>
>>>>>> I am not sure if I could able to explain the processor requirement.
>>>>>> Would be really great if you could help me in this.
>>>>>>
>>>>>> Thanks and Regards,
>>>>>> Parul
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <jo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>>
>>>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>>>>> really useful if you have a complex Flume flow and want to migrate
>>>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>>>> sources and sinks to NiFi once you knew that it would meet your needs
>>>>>>> well. NiFi has a lot of documentation on writing processors and the
>>>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>>>> execution model.
>>>>>>>
>>>>>>> -Joey
>>>>>>>
>>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > Hi Parul,
>>>>>>> >
>>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but
>>>>>>> due to the way the Flume processors load the classes for the sources and
>>>>>>> sinks, the jar you deploy to the lib directory also needs to include the
>>>>>>> other dependencies your source/sink needs (or they each need to
>>>>>>> individually be in lib/ directly).
>>>>>>> >
>>>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>>>> > https://github.com/bbende/my-flume-source
>>>>>>> >
>>>>>>> > It will contain the custom source and following dependencies all
>>>>>>> in one jar:
>>>>>>> >
>>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>>> >
>>>>>>> > I copied that to NiFi lib, restarted, created an
>>>>>>> ExecuteFlumeSource processor with the following config:
>>>>>>> >
>>>>>>> > Source Type = org.apache.flume.MySource
>>>>>>> > Agent Name = a1
>>>>>>> > Source Name = r1
>>>>>>> > Flume Configuration = a1.sources = r1
>>>>>>> >
>>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>>> >
>>>>>>> > Keep in mind that this could become risky because any classes
>>>>>>> found in the lib directory would be accessible to all NARs in NiFi and
>>>>>>> would be found before classes within a NAR because the parent is checked
>>>>>>> first during class loading. This example isn't too risky because we are
>>>>>>> only bringing in flume jars and one guava jar, but for example if another
>>>>>>> nar uses a different version of guava this is going to cause a problem.
>>>>>>> >
>>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>>> can help you get started if you need any guidance going that route.
>>>>>>> >
>>>>>>> > -Bryan
>>>>>>> >
>>>>>>> >
>>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>> >>
>>>>>>> >> Hello Bryan,
>>>>>>> >>
>>>>>>> >> Thank you very much for your response.
>>>>>>> >>
>>>>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>>>>> >> I have my own customized source and sink? I followed below steps
>>>>>>> to add my own customized source but it did not work.
>>>>>>> >>
>>>>>>> >> 1) Created Maven project and added customized source. (flume.jar
>>>>>>> was created after this step)
>>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>>> >>
>>>>>>> >> Property           Value
>>>>>>> >> Source Type         com.flume.source.Source
>>>>>>> >> Agent Name      a1
>>>>>>> >> Source Name         k1.
>>>>>>> >>
>>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError :
>>>>>>> /org/apache/flume/PollableSource."
>>>>>>> >>
>>>>>>> >> Can you please help me in this regard. Any step/configuration I
>>>>>>> missed.
>>>>>>> >>
>>>>>>> >> Thanks and Regards,
>>>>>>> >> Parul
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com>
>>>>>>> wrote:
>>>>>>> >>>
>>>>>>> >>> Hello,
>>>>>>> >>>
>>>>>>> >>> The NiFi Flume processors are for running Flume sources and
>>>>>>> sinks with in NiFi. They don't communicate with an external Flume process.
>>>>>>> >>>
>>>>>>> >>> In your example you would need an ExecuteFlumeSource configured
>>>>>>> to run the netcat source, connected to a ExecuteFlumeSink configured with
>>>>>>> the logger.
>>>>>>> >>>
>>>>>>> >>> -Bryan
>>>>>>> >>>
>>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>>> >>>>
>>>>>>> >>>> Hi,
>>>>>>> >>>>
>>>>>>> >>>> I was trying to run Nifi Flume processor with the below
>>>>>>> mentioned
>>>>>>> >>>> details but not could bring it up.
>>>>>>> >>>>
>>>>>>> >>>> I already started flume with the sample configuration file
>>>>>>> >>>> =============================================
>>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>>> >>>>
>>>>>>> >>>> # Name the components on this agent
>>>>>>> >>>> a1.sources = r1
>>>>>>> >>>> a1.sinks = k1
>>>>>>> >>>> a1.channels = c1
>>>>>>> >>>>
>>>>>>> >>>> # Describe/configure the source
>>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>>> >>>>
>>>>>>> >>>> # Describe the sink
>>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>>> >>>>
>>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>>> >>>> a1.channels.c1.type = memory
>>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>>> >>>>
>>>>>>> >>>> # Bind the source and sink to the channel
>>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>>> >>>> =============================================
>>>>>>> >>>>
>>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>>>>> >>>> --conf-file example.conf --name a1
>>>>>>> -Dflume.root.logger=INFO,console
>>>>>>> >>>>
>>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration
>>>>>>> was done:
>>>>>>> >>>> Property           Value
>>>>>>> >>>> Sink Type         logger
>>>>>>> >>>> Agent Name      a1
>>>>>>> >>>> Sink Name         k1.
>>>>>>> >>>>
>>>>>>> >>>> Event is sent to the flume using:
>>>>>>> >>>> $ telnet localhost 44444
>>>>>>> >>>> Trying 127.0.0.1...
>>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>>> >>>> Escape character is '^]'.
>>>>>>> >>>> Hello world! <ENTER>
>>>>>>> >>>> OK
>>>>>>> >>>>
>>>>>>> >>>> But I could not get any data in the nifi flume processor.
>>>>>>> Request your
>>>>>>> >>>> help in this.
>>>>>>> >>>> Do i need to change the example.conf file of flume so that Nifi
>>>>>>> Flume
>>>>>>> >>>> Sink should get the data.
>>>>>>> >>>>
>>>>>>> >>>> Thanks and Regards,
>>>>>>> >>>> Parul
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> --
>>>>>>> >>> Sent from Gmail Mobile
>>>>>>> >>
>>>>>>> >>
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Need help in nifi- flume processor

Posted by Parul Agrawal <pa...@gmail.com>.
Hi,

Thank you very much for all the support.
I could able to convert XML format to json  using custom flume source.

Now I would need ConvertJSONToSQL processor to insert data into SQL.
I am trying to get hands-on on this processor. Will update you on this.
Meanwhile if any example you could share to use this processor for a sample
json data, then it would be great.

===============

1) I tried using ConvertJSONToSQL processor with the below sample json file:

"details":[
    {"firstname":"John", "lastname":"Doe"},
    {"firstname":"Anna", "lastname":"Smith"}
]

2) I created table *details *in the postgreSQL
* select * from details ;*
* firstname | lastname*
*-----------+----------*
*(0 rows)*

3) ConvertJSONToSQL Processor property details are as below:
*Property  *                                               *Value*
JDBC Connection PoolInfo            DBCPConnectionPool
Statement TypeInfo                      INSERT
Table NameInfo                            details
Catalog NameInfo                         No value set
Translate Field NamesInfo             false
Unmatched Field BehaviorInfo       Ignore Unmatched Fields
Update KeysInfo                           No value set

But I am getting the below mentioned error in ConvertJSONToSQL Processor.
2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1]
o.a.n.p.standard.ConvertJSONToSQL
ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert
StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default,
section=1], offset=115045, length=104],offset=0,name=json,size=104] to a
SQL INSERT statement due to
org.apache.nifi.processor.exception.ProcessException: None of the fields in
the JSON map to the columns defined by the details table; routing to
failure: org.apache.nifi.processor.exception.ProcessException: None of the
fields in the JSON map to the columns defined by the details table

Thanks and Regards,
Parul

On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <jo...@gmail.com> wrote:

> I've done something like this by wrapping the command in a shell script:
>
> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
>
> My use case was slightly different, but I'm pretty sure you can adapt the
> same idea.
>
> -Joey
>
> On Oct 10, 2015, at 03:52, Parul Agrawal <pa...@gmail.com> wrote:
>
> Hi,
>
> I actually need to get the data from pipe.
> So the actual command I would need is mkfifo /tmp/packet;tshark -i ens160
> -T pdml >/tmp/packet.
> Is it possible to use ExecuteProcessor for multiple commands ?
>
> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I added custom flume source and when flume source is sending the data to
>> flume sink, below mentioned error is thrown at flume sink.
>>
>>  Administratively Yielded for 1 sec due to processing failure
>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
>> Exception: java.lang.IllegalStateException: close() called when transaction
>> is OPEN - you must either commit or rollback first
>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
>> o.a.n.c.t.ContinuallyRunProcessorTask
>> java.lang.IllegalStateException: close() called when transaction is OPEN
>> - you must either commit or rollback first
>>         at
>> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
>> ~[guava-r05.jar:na]
>>         at
>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>         at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>         at
>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>> ~[na:na]
>>         at
>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>> ~[na:na]
>>         at
>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>         at
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at
>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>> [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> [na:1.7.0_85]
>>         at
>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>> [na:1.7.0_85]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>> [na:1.7.0_85]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> [na:1.7.0_85]
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> [na:1.7.0_85]
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> [na:1.7.0_85]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
>> o.a.n.processors.flume.ExecuteFlumeSink
>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>> section=7], offset=180436,
>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>> this session (StandardProcessSession[id=218318]); rolling back session:
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
>> section=7], offset=180436,
>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
>> this session (StandardProcessSession[id=218318])
>>
>> Any idea what could be wrong in this.
>>
>> Thanks and Regards,
>> Parul
>>
>>
>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bb...@gmail.com> wrote:
>>
>>> Hi Parul,
>>>
>>> I think it would be good to keep the convo going on the users list since
>>> there are more people who can offer help there, and also helps everyone
>>> learn new solutions.
>>>
>>> The quick answer though is that NiFi has an ExecuteProcess processor
>>> which could execute "tshark -i eth0 -T pdml".
>>>
>>> There is not currently an XmlToJson processor, so this could be a place
>>> where you need a custom processor. For simple cases you can use an
>>> EvaluateXPath processor to extract values from the XML, and then a
>>> ReplaceText processor to build a new json document from those extracted
>>> values.
>>>
>>> -Bryan
>>>
>>>
>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <pa...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Little more to add.....
>>>>  I need to keep reading the flowfile till END_TAG is received. i.e. we
>>>> may need to concatenate the flowfile data till END_TAG.
>>>> and then convert it to json and call PutFile() processor.
>>>>
>>>> Thanks and Regards,
>>>> Parul
>>>>
>>>>
>>>>
>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <
>>>> parulagrawal14@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thank you very much again for the guidance provided.
>>>>> Basically I would need a processor which would convert XML file to
>>>>> Json.
>>>>>
>>>>> Currently I have a flume source which is of type "exec" and the
>>>>> command used is "tshark -i eth0 -T pdml".
>>>>>
>>>>> Here Flume source keeps sending data to flume sink. This flow file
>>>>> would be of PDML format.
>>>>>
>>>>> Now I need a processor which would do the following
>>>>>
>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>> and END TAG (</packet>)
>>>>> 2) Once the XML message is formed convert it to json.
>>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>>
>>>>> I am not sure if I could able to explain the processor requirement.
>>>>> Would be really great if you could help me in this.
>>>>>
>>>>> Thanks and Regards,
>>>>> Parul
>>>>>
>>>>>
>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>> can help you get started if you need any guidance going that route.
>>>>>>
>>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>>>> really useful if you have a complex Flume flow and want to migrate
>>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>>> sources and sinks to NiFi once you knew that it would meet your needs
>>>>>> well. NiFi has a lot of documentation on writing processors and the
>>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>>> execution model.
>>>>>>
>>>>>> -Joey
>>>>>>
>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com> wrote:
>>>>>> >
>>>>>> > Hi Parul,
>>>>>> >
>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but
>>>>>> due to the way the Flume processors load the classes for the sources and
>>>>>> sinks, the jar you deploy to the lib directory also needs to include the
>>>>>> other dependencies your source/sink needs (or they each need to
>>>>>> individually be in lib/ directly).
>>>>>> >
>>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>>> > https://github.com/bbende/my-flume-source
>>>>>> >
>>>>>> > It will contain the custom source and following dependencies all in
>>>>>> one jar:
>>>>>> >
>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>> >
>>>>>> > I copied that to NiFi lib, restarted, created an ExecuteFlumeSource
>>>>>> processor with the following config:
>>>>>> >
>>>>>> > Source Type = org.apache.flume.MySource
>>>>>> > Agent Name = a1
>>>>>> > Source Name = r1
>>>>>> > Flume Configuration = a1.sources = r1
>>>>>> >
>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>> >
>>>>>> > Keep in mind that this could become risky because any classes found
>>>>>> in the lib directory would be accessible to all NARs in NiFi and would be
>>>>>> found before classes within a NAR because the parent is checked first
>>>>>> during class loading. This example isn't too risky because we are only
>>>>>> bringing in flume jars and one guava jar, but for example if another nar
>>>>>> uses a different version of guava this is going to cause a problem.
>>>>>> >
>>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>>> can help you get started if you need any guidance going that route.
>>>>>> >
>>>>>> > -Bryan
>>>>>> >
>>>>>> >
>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>> >>
>>>>>> >> Hello Bryan,
>>>>>> >>
>>>>>> >> Thank you very much for your response.
>>>>>> >>
>>>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>>>> >> I have my own customized source and sink? I followed below steps
>>>>>> to add my own customized source but it did not work.
>>>>>> >>
>>>>>> >> 1) Created Maven project and added customized source. (flume.jar
>>>>>> was created after this step)
>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>> >>
>>>>>> >> Property           Value
>>>>>> >> Source Type         com.flume.source.Source
>>>>>> >> Agent Name      a1
>>>>>> >> Source Name         k1.
>>>>>> >>
>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError :
>>>>>> /org/apache/flume/PollableSource."
>>>>>> >>
>>>>>> >> Can you please help me in this regard. Any step/configuration I
>>>>>> missed.
>>>>>> >>
>>>>>> >> Thanks and Regards,
>>>>>> >> Parul
>>>>>> >>
>>>>>> >>
>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com>
>>>>>> wrote:
>>>>>> >>>
>>>>>> >>> Hello,
>>>>>> >>>
>>>>>> >>> The NiFi Flume processors are for running Flume sources and sinks
>>>>>> with in NiFi. They don't communicate with an external Flume process.
>>>>>> >>>
>>>>>> >>> In your example you would need an ExecuteFlumeSource configured
>>>>>> to run the netcat source, connected to a ExecuteFlumeSink configured with
>>>>>> the logger.
>>>>>> >>>
>>>>>> >>> -Bryan
>>>>>> >>>
>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>>>> parulagrawal14@gmail.com> wrote:
>>>>>> >>>>
>>>>>> >>>> Hi,
>>>>>> >>>>
>>>>>> >>>> I was trying to run Nifi Flume processor with the below mentioned
>>>>>> >>>> details but not could bring it up.
>>>>>> >>>>
>>>>>> >>>> I already started flume with the sample configuration file
>>>>>> >>>> =============================================
>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>> >>>>
>>>>>> >>>> # Name the components on this agent
>>>>>> >>>> a1.sources = r1
>>>>>> >>>> a1.sinks = k1
>>>>>> >>>> a1.channels = c1
>>>>>> >>>>
>>>>>> >>>> # Describe/configure the source
>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>> >>>>
>>>>>> >>>> # Describe the sink
>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>> >>>>
>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>> >>>> a1.channels.c1.type = memory
>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>> >>>>
>>>>>> >>>> # Bind the source and sink to the channel
>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>> >>>> =============================================
>>>>>> >>>>
>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>>>> >>>> --conf-file example.conf --name a1
>>>>>> -Dflume.root.logger=INFO,console
>>>>>> >>>>
>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration
>>>>>> was done:
>>>>>> >>>> Property           Value
>>>>>> >>>> Sink Type         logger
>>>>>> >>>> Agent Name      a1
>>>>>> >>>> Sink Name         k1.
>>>>>> >>>>
>>>>>> >>>> Event is sent to the flume using:
>>>>>> >>>> $ telnet localhost 44444
>>>>>> >>>> Trying 127.0.0.1...
>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>> >>>> Escape character is '^]'.
>>>>>> >>>> Hello world! <ENTER>
>>>>>> >>>> OK
>>>>>> >>>>
>>>>>> >>>> But I could not get any data in the nifi flume processor.
>>>>>> Request your
>>>>>> >>>> help in this.
>>>>>> >>>> Do i need to change the example.conf file of flume so that Nifi
>>>>>> Flume
>>>>>> >>>> Sink should get the data.
>>>>>> >>>>
>>>>>> >>>> Thanks and Regards,
>>>>>> >>>> Parul
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> --
>>>>>> >>> Sent from Gmail Mobile
>>>>>> >>
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Need help in nifi- flume processor

Posted by Joey Echeverria <jo...@gmail.com>.
I've done something like this by wrapping the command in a shell script:

http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/

My use case was slightly different, but I'm pretty sure you can adapt the same idea. 

-Joey

> On Oct 10, 2015, at 03:52, Parul Agrawal <pa...@gmail.com> wrote:
> 
> Hi,
> 
> I actually need to get the data from pipe.
> So the actual command I would need is mkfifo /tmp/packet;tshark -i ens160 -T pdml >/tmp/packet.
> Is it possible to use ExecuteProcessor for multiple commands ? 
> 
>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <pa...@gmail.com> wrote:
>> Hi,
>> 
>> I added custom flume source and when flume source is sending the data to flume sink, below mentioned error is thrown at flume sink.
>> 
>>  Administratively Yielded for 1 sec due to processing failure
>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught Exception: java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask
>> java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
>>         at com.google.common.base.Preconditions.checkState(Preconditions.java:172) ~[guava-r05.jar:na]
>>         at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) ~[flume-ng-core-1.6.0.jar:1.6.0]
>>         at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105) ~[flume-ng-core-1.6.0.jar:1.6.0]
>>         at org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139) ~[na:na]
>>         at org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148) ~[na:na]
>>         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077) ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127) [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49) [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119) [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_85]
>>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_85]
>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_85]
>>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_85]
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_85]
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_85]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.flume.ExecuteFlumeSink ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, section=7], offset=180436, length=14078],offset=0,name=8311685679474355,size=14078] is not known in this session (StandardProcessSession[id=218318]); rolling back session: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, section=7], offset=180436, length=14078],offset=0,name=8311685679474355,size=14078] is not known in this session (StandardProcessSession[id=218318])
>> 
>> Any idea what could be wrong in this.
>> 
>> Thanks and Regards,
>> Parul
>> 
>> 
>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bb...@gmail.com> wrote:
>>> Hi Parul,
>>> 
>>> I think it would be good to keep the convo going on the users list since there are more people who can offer help there, and also helps everyone learn new solutions.
>>> 
>>> The quick answer though is that NiFi has an ExecuteProcess processor which could execute "tshark -i eth0 -T pdml". 
>>> 
>>> There is not currently an XmlToJson processor, so this could be a place where you need a custom processor. For simple cases you can use an EvaluateXPath processor to extract values from the XML, and then a ReplaceText processor to build a new json document from those extracted values.
>>> 
>>> -Bryan
>>> 
>>> 
>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <pa...@gmail.com> wrote:
>>>> Hi,
>>>> 
>>>> Little more to add.....
>>>>  I need to keep reading the flowfile till END_TAG is received. i.e. we may need to concatenate the flowfile data till END_TAG.
>>>> and then convert it to json and call PutFile() processor.
>>>> 
>>>> Thanks and Regards,
>>>> Parul
>>>> 
>>>> 
>>>> 
>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <pa...@gmail.com> wrote:
>>>>> Hi,
>>>>> 
>>>>> Thank you very much again for the guidance provided.
>>>>> Basically I would need a processor which would convert XML file to Json.
>>>>> 
>>>>> Currently I have a flume source which is of type "exec" and the command used is "tshark -i eth0 -T pdml".
>>>>> 
>>>>> Here Flume source keeps sending data to flume sink. This flow file would be of PDML format.
>>>>> 
>>>>> Now I need a processor which would do the following
>>>>> 
>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>> and END TAG (</packet>)
>>>>> 2) Once the XML message is formed convert it to json.
>>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>> 
>>>>> I am not sure if I could able to explain the processor requirement. 
>>>>> Would be really great if you could help me in this.
>>>>> 
>>>>> Thanks and Regards,
>>>>> Parul 
>>>>> 
>>>>> 
>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <jo...@gmail.com> wrote:
>>>>>> > If you plan to use NiFi for the long term, it might be worth investing in converting your custom Flume components to NiFi processors. We can help you get started if you need any guidance going that route.
>>>>>> 
>>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>>>> really useful if you have a complex Flume flow and want to migrate
>>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>>> sources and sinks to NiFi once you knew that it would meet your needs
>>>>>> well. NiFi has a lot of documentation on writing processors and the
>>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>>> execution model.
>>>>>> 
>>>>>> -Joey
>>>>>> 
>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com> wrote:
>>>>>> >
>>>>>> > Hi Parul,
>>>>>> >
>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but due to the way the Flume processors load the classes for the sources and sinks, the jar you deploy to the lib directory also needs to include the other dependencies your source/sink needs (or they each need to individually be in lib/ directly).
>>>>>> >
>>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>>> > https://github.com/bbende/my-flume-source
>>>>>> >
>>>>>> > It will contain the custom source and following dependencies all in one jar:
>>>>>> >
>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>> >
>>>>>> > I copied that to NiFi lib, restarted, created an ExecuteFlumeSource processor with the following config:
>>>>>> >
>>>>>> > Source Type = org.apache.flume.MySource
>>>>>> > Agent Name = a1
>>>>>> > Source Name = r1
>>>>>> > Flume Configuration = a1.sources = r1
>>>>>> >
>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>> >
>>>>>> > Keep in mind that this could become risky because any classes found in the lib directory would be accessible to all NARs in NiFi and would be found before classes within a NAR because the parent is checked first during class loading. This example isn't too risky because we are only bringing in flume jars and one guava jar, but for example if another nar uses a different version of guava this is going to cause a problem.
>>>>>> >
>>>>>> > If you plan to use NiFi for the long term, it might be worth investing in converting your custom Flume components to NiFi processors. We can help you get started if you need any guidance going that route.
>>>>>> >
>>>>>> > -Bryan
>>>>>> >
>>>>>> >
>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <pa...@gmail.com> wrote:
>>>>>> >>
>>>>>> >> Hello Bryan,
>>>>>> >>
>>>>>> >> Thank you very much for your response.
>>>>>> >>
>>>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>>>> >> I have my own customized source and sink? I followed below steps to add my own customized source but it did not work.
>>>>>> >>
>>>>>> >> 1) Created Maven project and added customized source. (flume.jar was created after this step)
>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>> >>
>>>>>> >> Property           Value
>>>>>> >> Source Type         com.flume.source.Source
>>>>>> >> Agent Name      a1
>>>>>> >> Source Name         k1.
>>>>>> >>
>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError : /org/apache/flume/PollableSource."
>>>>>> >>
>>>>>> >> Can you please help me in this regard. Any step/configuration I missed.
>>>>>> >>
>>>>>> >> Thanks and Regards,
>>>>>> >> Parul
>>>>>> >>
>>>>>> >>
>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com> wrote:
>>>>>> >>>
>>>>>> >>> Hello,
>>>>>> >>>
>>>>>> >>> The NiFi Flume processors are for running Flume sources and sinks with in NiFi. They don't communicate with an external Flume process.
>>>>>> >>>
>>>>>> >>> In your example you would need an ExecuteFlumeSource configured to run the netcat source, connected to a ExecuteFlumeSink configured with the logger.
>>>>>> >>>
>>>>>> >>> -Bryan
>>>>>> >>>
>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <pa...@gmail.com> wrote:
>>>>>> >>>>
>>>>>> >>>> Hi,
>>>>>> >>>>
>>>>>> >>>> I was trying to run Nifi Flume processor with the below mentioned
>>>>>> >>>> details but not could bring it up.
>>>>>> >>>>
>>>>>> >>>> I already started flume with the sample configuration file
>>>>>> >>>> =============================================
>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>> >>>>
>>>>>> >>>> # Name the components on this agent
>>>>>> >>>> a1.sources = r1
>>>>>> >>>> a1.sinks = k1
>>>>>> >>>> a1.channels = c1
>>>>>> >>>>
>>>>>> >>>> # Describe/configure the source
>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>> >>>>
>>>>>> >>>> # Describe the sink
>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>> >>>>
>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>> >>>> a1.channels.c1.type = memory
>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>> >>>>
>>>>>> >>>> # Bind the source and sink to the channel
>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>> >>>> =============================================
>>>>>> >>>>
>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>>>> >>>> --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
>>>>>> >>>>
>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration was done:
>>>>>> >>>> Property           Value
>>>>>> >>>> Sink Type         logger
>>>>>> >>>> Agent Name      a1
>>>>>> >>>> Sink Name         k1.
>>>>>> >>>>
>>>>>> >>>> Event is sent to the flume using:
>>>>>> >>>> $ telnet localhost 44444
>>>>>> >>>> Trying 127.0.0.1...
>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>> >>>> Escape character is '^]'.
>>>>>> >>>> Hello world! <ENTER>
>>>>>> >>>> OK
>>>>>> >>>>
>>>>>> >>>> But I could not get any data in the nifi flume processor. Request your
>>>>>> >>>> help in this.
>>>>>> >>>> Do i need to change the example.conf file of flume so that Nifi Flume
>>>>>> >>>> Sink should get the data.
>>>>>> >>>>
>>>>>> >>>> Thanks and Regards,
>>>>>> >>>> Parul
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> --
>>>>>> >>> Sent from Gmail Mobile
>>>>>> >>
>>>>>> >>
>>>>>> >
> 

Re: Need help in nifi- flume processor

Posted by Parul Agrawal <pa...@gmail.com>.
Hi,

I actually need to get the data from pipe.
So the actual command I would need is mkfifo /tmp/packet;tshark -i ens160
-T pdml >/tmp/packet.
Is it possible to use ExecuteProcessor for multiple commands ?

On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <pa...@gmail.com>
wrote:

> Hi,
>
> I added custom flume source and when flume source is sending the data to
> flume sink, below mentioned error is thrown at flume sink.
>
>  Administratively Yielded for 1 sec due to processing failure
> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
> Exception: java.lang.IllegalStateException: close() called when transaction
> is OPEN - you must either commit or rollback first
> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
> o.a.n.c.t.ContinuallyRunProcessorTask
> java.lang.IllegalStateException: close() called when transaction is OPEN -
> you must either commit or rollback first
>         at
> com.google.common.base.Preconditions.checkState(Preconditions.java:172)
> ~[guava-r05.jar:na]
>         at
> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
> ~[flume-ng-core-1.6.0.jar:1.6.0]
>         at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
> ~[flume-ng-core-1.6.0.jar:1.6.0]
>         at
> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
> ~[na:na]
>         at
> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
> ~[na:na]
>         at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
> ~[nifi-framework-core-0.3.0.jar:0.3.0]
>         at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
> [nifi-framework-core-0.3.0.jar:0.3.0]
>         at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
> [nifi-framework-core-0.3.0.jar:0.3.0]
>         at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
> [nifi-framework-core-0.3.0.jar:0.3.0]
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> [na:1.7.0_85]
>         at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> [na:1.7.0_85]
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> [na:1.7.0_85]
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [na:1.7.0_85]
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [na:1.7.0_85]
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [na:1.7.0_85]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
> o.a.n.processors.flume.ExecuteFlumeSink
> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
> section=7], offset=180436,
> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
> this session (StandardProcessSession[id=218318]); rolling back session:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
> section=7], offset=180436,
> length=14078],offset=0,name=8311685679474355,size=14078] is not known in
> this session (StandardProcessSession[id=218318])
>
> Any idea what could be wrong in this.
>
> Thanks and Regards,
> Parul
>
>
> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bb...@gmail.com> wrote:
>
>> Hi Parul,
>>
>> I think it would be good to keep the convo going on the users list since
>> there are more people who can offer help there, and also helps everyone
>> learn new solutions.
>>
>> The quick answer though is that NiFi has an ExecuteProcess processor
>> which could execute "tshark -i eth0 -T pdml".
>>
>> There is not currently an XmlToJson processor, so this could be a place
>> where you need a custom processor. For simple cases you can use an
>> EvaluateXPath processor to extract values from the XML, and then a
>> ReplaceText processor to build a new json document from those extracted
>> values.
>>
>> -Bryan
>>
>>
>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <pa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Little more to add.....
>>>  I need to keep reading the flowfile till END_TAG is received. i.e. we
>>> may need to concatenate the flowfile data till END_TAG.
>>> and then convert it to json and call PutFile() processor.
>>>
>>> Thanks and Regards,
>>> Parul
>>>
>>>
>>>
>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <parulagrawal14@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> Thank you very much again for the guidance provided.
>>>> Basically I would need a processor which would convert XML file to Json.
>>>>
>>>> Currently I have a flume source which is of type "exec" and the command
>>>> used is "tshark -i eth0 -T pdml".
>>>>
>>>> Here Flume source keeps sending data to flume sink. This flow file
>>>> would be of PDML format.
>>>>
>>>> Now I need a processor which would do the following
>>>>
>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>> and END TAG (</packet>)
>>>> 2) Once the XML message is formed convert it to json.
>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>
>>>> I am not sure if I could able to explain the processor requirement.
>>>> Would be really great if you could help me in this.
>>>>
>>>> Thanks and Regards,
>>>> Parul
>>>>
>>>>
>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>> can help you get started if you need any guidance going that route.
>>>>>
>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>>> really useful if you have a complex Flume flow and want to migrate
>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>> sources and sinks to NiFi once you knew that it would meet your needs
>>>>> well. NiFi has a lot of documentation on writing processors and the
>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>> execution model.
>>>>>
>>>>> -Joey
>>>>>
>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com> wrote:
>>>>> >
>>>>> > Hi Parul,
>>>>> >
>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but due
>>>>> to the way the Flume processors load the classes for the sources and sinks,
>>>>> the jar you deploy to the lib directory also needs to include the other
>>>>> dependencies your source/sink needs (or they each need to individually be
>>>>> in lib/ directly).
>>>>> >
>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>> > https://github.com/bbende/my-flume-source
>>>>> >
>>>>> > It will contain the custom source and following dependencies all in
>>>>> one jar:
>>>>> >
>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>> >
>>>>> > I copied that to NiFi lib, restarted, created an ExecuteFlumeSource
>>>>> processor with the following config:
>>>>> >
>>>>> > Source Type = org.apache.flume.MySource
>>>>> > Agent Name = a1
>>>>> > Source Name = r1
>>>>> > Flume Configuration = a1.sources = r1
>>>>> >
>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>> >
>>>>> > Keep in mind that this could become risky because any classes found
>>>>> in the lib directory would be accessible to all NARs in NiFi and would be
>>>>> found before classes within a NAR because the parent is checked first
>>>>> during class loading. This example isn't too risky because we are only
>>>>> bringing in flume jars and one guava jar, but for example if another nar
>>>>> uses a different version of guava this is going to cause a problem.
>>>>> >
>>>>> > If you plan to use NiFi for the long term, it might be worth
>>>>> investing in converting your custom Flume components to NiFi processors. We
>>>>> can help you get started if you need any guidance going that route.
>>>>> >
>>>>> > -Bryan
>>>>> >
>>>>> >
>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>>> parulagrawal14@gmail.com> wrote:
>>>>> >>
>>>>> >> Hello Bryan,
>>>>> >>
>>>>> >> Thank you very much for your response.
>>>>> >>
>>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>>> >> I have my own customized source and sink? I followed below steps to
>>>>> add my own customized source but it did not work.
>>>>> >>
>>>>> >> 1) Created Maven project and added customized source. (flume.jar
>>>>> was created after this step)
>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>> >> 3) Added flume source processor with the below configuration
>>>>> >>
>>>>> >> Property           Value
>>>>> >> Source Type         com.flume.source.Source
>>>>> >> Agent Name      a1
>>>>> >> Source Name         k1.
>>>>> >>
>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError :
>>>>> /org/apache/flume/PollableSource."
>>>>> >>
>>>>> >> Can you please help me in this regard. Any step/configuration I
>>>>> missed.
>>>>> >>
>>>>> >> Thanks and Regards,
>>>>> >> Parul
>>>>> >>
>>>>> >>
>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com>
>>>>> wrote:
>>>>> >>>
>>>>> >>> Hello,
>>>>> >>>
>>>>> >>> The NiFi Flume processors are for running Flume sources and sinks
>>>>> with in NiFi. They don't communicate with an external Flume process.
>>>>> >>>
>>>>> >>> In your example you would need an ExecuteFlumeSource configured to
>>>>> run the netcat source, connected to a ExecuteFlumeSink configured with the
>>>>> logger.
>>>>> >>>
>>>>> >>> -Bryan
>>>>> >>>
>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>>> parulagrawal14@gmail.com> wrote:
>>>>> >>>>
>>>>> >>>> Hi,
>>>>> >>>>
>>>>> >>>> I was trying to run Nifi Flume processor with the below mentioned
>>>>> >>>> details but not could bring it up.
>>>>> >>>>
>>>>> >>>> I already started flume with the sample configuration file
>>>>> >>>> =============================================
>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>> >>>>
>>>>> >>>> # Name the components on this agent
>>>>> >>>> a1.sources = r1
>>>>> >>>> a1.sinks = k1
>>>>> >>>> a1.channels = c1
>>>>> >>>>
>>>>> >>>> # Describe/configure the source
>>>>> >>>> a1.sources.r1.type = netcat
>>>>> >>>> a1.sources.r1.bind = localhost
>>>>> >>>> a1.sources.r1.port = 44444
>>>>> >>>>
>>>>> >>>> # Describe the sink
>>>>> >>>> a1.sinks.k1.type = logger
>>>>> >>>>
>>>>> >>>> # Use a channel which buffers events in memory
>>>>> >>>> a1.channels.c1.type = memory
>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>> >>>>
>>>>> >>>> # Bind the source and sink to the channel
>>>>> >>>> a1.sources.r1.channels = c1
>>>>> >>>> a1.sinks.k1.channel = c1
>>>>> >>>> =============================================
>>>>> >>>>
>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>>> >>>> --conf-file example.conf --name a1
>>>>> -Dflume.root.logger=INFO,console
>>>>> >>>>
>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration
>>>>> was done:
>>>>> >>>> Property           Value
>>>>> >>>> Sink Type         logger
>>>>> >>>> Agent Name      a1
>>>>> >>>> Sink Name         k1.
>>>>> >>>>
>>>>> >>>> Event is sent to the flume using:
>>>>> >>>> $ telnet localhost 44444
>>>>> >>>> Trying 127.0.0.1...
>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>> >>>> Escape character is '^]'.
>>>>> >>>> Hello world! <ENTER>
>>>>> >>>> OK
>>>>> >>>>
>>>>> >>>> But I could not get any data in the nifi flume processor. Request
>>>>> your
>>>>> >>>> help in this.
>>>>> >>>> Do i need to change the example.conf file of flume so that Nifi
>>>>> Flume
>>>>> >>>> Sink should get the data.
>>>>> >>>>
>>>>> >>>> Thanks and Regards,
>>>>> >>>> Parul
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> --
>>>>> >>> Sent from Gmail Mobile
>>>>> >>
>>>>> >>
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Need help in nifi- flume processor

Posted by Parul Agrawal <pa...@gmail.com>.
Hi,

I added custom flume source and when flume source is sending the data to
flume sink, below mentioned error is thrown at flume sink.

 Administratively Yielded for 1 sec due to processing failure
2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
Exception: java.lang.IllegalStateException: close() called when transaction
is OPEN - you must either commit or rollback first
2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
o.a.n.c.t.ContinuallyRunProcessorTask
java.lang.IllegalStateException: close() called when transaction is OPEN -
you must either commit or rollback first
        at
com.google.common.base.Preconditions.checkState(Preconditions.java:172)
~[guava-r05.jar:na]
        at
org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
~[flume-ng-core-1.6.0.jar:1.6.0]
        at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
~[flume-ng-core-1.6.0.jar:1.6.0]
        at
org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
~[na:na]
        at
org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
~[na:na]
        at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
~[nifi-framework-core-0.3.0.jar:0.3.0]
        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
[nifi-framework-core-0.3.0.jar:0.3.0]
        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
[nifi-framework-core-0.3.0.jar:0.3.0]
        at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
[nifi-framework-core-0.3.0.jar:0.3.0]
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
[na:1.7.0_85]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
[na:1.7.0_85]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
[na:1.7.0_85]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[na:1.7.0_85]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_85]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_85]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
o.a.n.processors.flume.ExecuteFlumeSink
ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
due to org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
section=7], offset=180436,
length=14078],offset=0,name=8311685679474355,size=14078] is not known in
this session (StandardProcessSession[id=218318]); rolling back session:
org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
section=7], offset=180436,
length=14078],offset=0,name=8311685679474355,size=14078] is not known in
this session (StandardProcessSession[id=218318])

Any idea what could be wrong in this.

Thanks and Regards,
Parul


On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bb...@gmail.com> wrote:

> Hi Parul,
>
> I think it would be good to keep the convo going on the users list since
> there are more people who can offer help there, and also helps everyone
> learn new solutions.
>
> The quick answer though is that NiFi has an ExecuteProcess processor which
> could execute "tshark -i eth0 -T pdml".
>
> There is not currently an XmlToJson processor, so this could be a place
> where you need a custom processor. For simple cases you can use an
> EvaluateXPath processor to extract values from the XML, and then a
> ReplaceText processor to build a new json document from those extracted
> values.
>
> -Bryan
>
>
> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Little more to add.....
>>  I need to keep reading the flowfile till END_TAG is received. i.e. we
>> may need to concatenate the flowfile data till END_TAG.
>> and then convert it to json and call PutFile() processor.
>>
>> Thanks and Regards,
>> Parul
>>
>>
>>
>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <pa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Thank you very much again for the guidance provided.
>>> Basically I would need a processor which would convert XML file to Json.
>>>
>>> Currently I have a flume source which is of type "exec" and the command
>>> used is "tshark -i eth0 -T pdml".
>>>
>>> Here Flume source keeps sending data to flume sink. This flow file would
>>> be of PDML format.
>>>
>>> Now I need a processor which would do the following
>>>
>>> 1) Form a complete XML file based on START TAG (<packet>)
>>> and END TAG (</packet>)
>>> 2) Once the XML message is formed convert it to json.
>>> 3) Place a json file to local directory using PutFile() processor.
>>>
>>> I am not sure if I could able to explain the processor requirement.
>>> Would be really great if you could help me in this.
>>>
>>> Thanks and Regards,
>>> Parul
>>>
>>>
>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <jo...@gmail.com>
>>> wrote:
>>>
>>>> > If you plan to use NiFi for the long term, it might be worth
>>>> investing in converting your custom Flume components to NiFi processors. We
>>>> can help you get started if you need any guidance going that route.
>>>>
>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>> really useful if you have a complex Flume flow and want to migrate
>>>> only parts of it over to NiFi at a time. I would port any custom
>>>> sources and sinks to NiFi once you knew that it would meet your needs
>>>> well. NiFi has a lot of documentation on writing processors and the
>>>> concepts map pretty well if you're already familiar with Flume's
>>>> execution model.
>>>>
>>>> -Joey
>>>>
>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com> wrote:
>>>> >
>>>> > Hi Parul,
>>>> >
>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but due
>>>> to the way the Flume processors load the classes for the sources and sinks,
>>>> the jar you deploy to the lib directory also needs to include the other
>>>> dependencies your source/sink needs (or they each need to individually be
>>>> in lib/ directly).
>>>> >
>>>> > So here is a sample project I created that makes a shaded jar:
>>>> > https://github.com/bbende/my-flume-source
>>>> >
>>>> > It will contain the custom source and following dependencies all in
>>>> one jar:
>>>> >
>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>> >
>>>> > I copied that to NiFi lib, restarted, created an ExecuteFlumeSource
>>>> processor with the following config:
>>>> >
>>>> > Source Type = org.apache.flume.MySource
>>>> > Agent Name = a1
>>>> > Source Name = r1
>>>> > Flume Configuration = a1.sources = r1
>>>> >
>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>> >
>>>> > Keep in mind that this could become risky because any classes found
>>>> in the lib directory would be accessible to all NARs in NiFi and would be
>>>> found before classes within a NAR because the parent is checked first
>>>> during class loading. This example isn't too risky because we are only
>>>> bringing in flume jars and one guava jar, but for example if another nar
>>>> uses a different version of guava this is going to cause a problem.
>>>> >
>>>> > If you plan to use NiFi for the long term, it might be worth
>>>> investing in converting your custom Flume components to NiFi processors. We
>>>> can help you get started if you need any guidance going that route.
>>>> >
>>>> > -Bryan
>>>> >
>>>> >
>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>> parulagrawal14@gmail.com> wrote:
>>>> >>
>>>> >> Hello Bryan,
>>>> >>
>>>> >> Thank you very much for your response.
>>>> >>
>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>> >> I have my own customized source and sink? I followed below steps to
>>>> add my own customized source but it did not work.
>>>> >>
>>>> >> 1) Created Maven project and added customized source. (flume.jar was
>>>> created after this step)
>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>> >> 3) Added flume source processor with the below configuration
>>>> >>
>>>> >> Property           Value
>>>> >> Source Type         com.flume.source.Source
>>>> >> Agent Name      a1
>>>> >> Source Name         k1.
>>>> >>
>>>> >> But I am getting the below error in Flume Source Processor.
>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError :
>>>> /org/apache/flume/PollableSource."
>>>> >>
>>>> >> Can you please help me in this regard. Any step/configuration I
>>>> missed.
>>>> >>
>>>> >> Thanks and Regards,
>>>> >> Parul
>>>> >>
>>>> >>
>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com>
>>>> wrote:
>>>> >>>
>>>> >>> Hello,
>>>> >>>
>>>> >>> The NiFi Flume processors are for running Flume sources and sinks
>>>> with in NiFi. They don't communicate with an external Flume process.
>>>> >>>
>>>> >>> In your example you would need an ExecuteFlumeSource configured to
>>>> run the netcat source, connected to a ExecuteFlumeSink configured with the
>>>> logger.
>>>> >>>
>>>> >>> -Bryan
>>>> >>>
>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>> parulagrawal14@gmail.com> wrote:
>>>> >>>>
>>>> >>>> Hi,
>>>> >>>>
>>>> >>>> I was trying to run Nifi Flume processor with the below mentioned
>>>> >>>> details but not could bring it up.
>>>> >>>>
>>>> >>>> I already started flume with the sample configuration file
>>>> >>>> =============================================
>>>> >>>> # example.conf: A single-node Flume configuration
>>>> >>>>
>>>> >>>> # Name the components on this agent
>>>> >>>> a1.sources = r1
>>>> >>>> a1.sinks = k1
>>>> >>>> a1.channels = c1
>>>> >>>>
>>>> >>>> # Describe/configure the source
>>>> >>>> a1.sources.r1.type = netcat
>>>> >>>> a1.sources.r1.bind = localhost
>>>> >>>> a1.sources.r1.port = 44444
>>>> >>>>
>>>> >>>> # Describe the sink
>>>> >>>> a1.sinks.k1.type = logger
>>>> >>>>
>>>> >>>> # Use a channel which buffers events in memory
>>>> >>>> a1.channels.c1.type = memory
>>>> >>>> a1.channels.c1.capacity = 1000
>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>> >>>>
>>>> >>>> # Bind the source and sink to the channel
>>>> >>>> a1.sources.r1.channels = c1
>>>> >>>> a1.sinks.k1.channel = c1
>>>> >>>> =============================================
>>>> >>>>
>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>> >>>> --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
>>>> >>>>
>>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration
>>>> was done:
>>>> >>>> Property           Value
>>>> >>>> Sink Type         logger
>>>> >>>> Agent Name      a1
>>>> >>>> Sink Name         k1.
>>>> >>>>
>>>> >>>> Event is sent to the flume using:
>>>> >>>> $ telnet localhost 44444
>>>> >>>> Trying 127.0.0.1...
>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>> >>>> Escape character is '^]'.
>>>> >>>> Hello world! <ENTER>
>>>> >>>> OK
>>>> >>>>
>>>> >>>> But I could not get any data in the nifi flume processor. Request
>>>> your
>>>> >>>> help in this.
>>>> >>>> Do i need to change the example.conf file of flume so that Nifi
>>>> Flume
>>>> >>>> Sink should get the data.
>>>> >>>>
>>>> >>>> Thanks and Regards,
>>>> >>>> Parul
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Sent from Gmail Mobile
>>>> >>
>>>> >>
>>>> >
>>>>
>>>
>>>
>>
>

Re: Need help in nifi- flume processor

Posted by Joey Echeverria <jo...@gmail.com>.
> If you plan to use NiFi for the long term, it might be worth investing in converting your custom Flume components to NiFi processors. We can help you get started if you need any guidance going that route.

+1. Running Flume sources/sinks is meant as a transition step. It's
really useful if you have a complex Flume flow and want to migrate
only parts of it over to NiFi at a time. I would port any custom
sources and sinks to NiFi once you knew that it would meet your needs
well. NiFi has a lot of documentation on writing processors and the
concepts map pretty well if you're already familiar with Flume's
execution model.

-Joey

On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bb...@gmail.com> wrote:
>
> Hi Parul,
>
> It is possible to deploy a custom Flume source/sink to NiFi, but due to the way the Flume processors load the classes for the sources and sinks, the jar you deploy to the lib directory also needs to include the other dependencies your source/sink needs (or they each need to individually be in lib/ directly).
>
> So here is a sample project I created that makes a shaded jar:
> https://github.com/bbende/my-flume-source
>
> It will contain the custom source and following dependencies all in one jar:
>
> org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
> +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
> +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
> +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
> +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>   \- com.google.guava:guava:jar:11.0.2:compile
>      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>
> I copied that to NiFi lib, restarted, created an ExecuteFlumeSource processor with the following config:
>
> Source Type = org.apache.flume.MySource
> Agent Name = a1
> Source Name = r1
> Flume Configuration = a1.sources = r1
>
> And I was getting the output in nifi/logs/nifi-bootstrap.log
>
> Keep in mind that this could become risky because any classes found in the lib directory would be accessible to all NARs in NiFi and would be found before classes within a NAR because the parent is checked first during class loading. This example isn't too risky because we are only bringing in flume jars and one guava jar, but for example if another nar uses a different version of guava this is going to cause a problem.
>
> If you plan to use NiFi for the long term, it might be worth investing in converting your custom Flume components to NiFi processors. We can help you get started if you need any guidance going that route.
>
> -Bryan
>
>
> On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <pa...@gmail.com> wrote:
>>
>> Hello Bryan,
>>
>> Thank you very much for your response.
>>
>> Is it possible to have customized flume source and sink in Nifi?
>> I have my own customized source and sink? I followed below steps to add my own customized source but it did not work.
>>
>> 1) Created Maven project and added customized source. (flume.jar was created after this step)
>> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>> 3) Added flume source processor with the below configuration
>>
>> Property           Value
>> Source Type         com.flume.source.Source
>> Agent Name      a1
>> Source Name         k1.
>>
>> But I am getting the below error in Flume Source Processor.
>> "Failed to run validation due to java.lang.NoClassDefFoundError : /org/apache/flume/PollableSource."
>>
>> Can you please help me in this regard. Any step/configuration I missed.
>>
>> Thanks and Regards,
>> Parul
>>
>>
>> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> The NiFi Flume processors are for running Flume sources and sinks with in NiFi. They don't communicate with an external Flume process.
>>>
>>> In your example you would need an ExecuteFlumeSource configured to run the netcat source, connected to a ExecuteFlumeSink configured with the logger.
>>>
>>> -Bryan
>>>
>>> On Wednesday, October 7, 2015, Parul Agrawal <pa...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I was trying to run Nifi Flume processor with the below mentioned
>>>> details but not could bring it up.
>>>>
>>>> I already started flume with the sample configuration file
>>>> =============================================
>>>> # example.conf: A single-node Flume configuration
>>>>
>>>> # Name the components on this agent
>>>> a1.sources = r1
>>>> a1.sinks = k1
>>>> a1.channels = c1
>>>>
>>>> # Describe/configure the source
>>>> a1.sources.r1.type = netcat
>>>> a1.sources.r1.bind = localhost
>>>> a1.sources.r1.port = 44444
>>>>
>>>> # Describe the sink
>>>> a1.sinks.k1.type = logger
>>>>
>>>> # Use a channel which buffers events in memory
>>>> a1.channels.c1.type = memory
>>>> a1.channels.c1.capacity = 1000
>>>> a1.channels.c1.transactionCapacity = 100
>>>>
>>>> # Bind the source and sink to the channel
>>>> a1.sources.r1.channels = c1
>>>> a1.sinks.k1.channel = c1
>>>> =============================================
>>>>
>>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>> --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
>>>>
>>>> In the Nifi browser of ExecuteFlumeSink following configuration was done:
>>>> Property           Value
>>>> Sink Type         logger
>>>> Agent Name      a1
>>>> Sink Name         k1.
>>>>
>>>> Event is sent to the flume using:
>>>> $ telnet localhost 44444
>>>> Trying 127.0.0.1...
>>>> Connected to localhost.localdomain (127.0.0.1).
>>>> Escape character is '^]'.
>>>> Hello world! <ENTER>
>>>> OK
>>>>
>>>> But I could not get any data in the nifi flume processor. Request your
>>>> help in this.
>>>> Do i need to change the example.conf file of flume so that Nifi Flume
>>>> Sink should get the data.
>>>>
>>>> Thanks and Regards,
>>>> Parul
>>>
>>>
>>>
>>> --
>>> Sent from Gmail Mobile
>>
>>
>

Re: Need help in nifi- flume processor

Posted by Bryan Bende <bb...@gmail.com>.
Hi Parul,

It is possible to deploy a custom Flume source/sink to NiFi, but due to the
way the Flume processors load the classes for the sources and sinks, the
jar you deploy to the lib directory also needs to include the other
dependencies your source/sink needs (or they each need to individually be
in lib/ directly).

So here is a sample project I created that makes a shaded jar:
https://github.com/bbende/my-flume-source

It will contain the custom source and following dependencies all in one jar:

org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
+- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
+- org.apache.flume:flume-ng-core:jar:1.6.0:compile
+- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
+- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
  \- com.google.guava:guava:jar:11.0.2:compile
     \- com.google.code.findbugs:jsr305:jar:1.3.9:compile

I copied that to NiFi lib, restarted, created an ExecuteFlumeSource
processor with the following config:

Source Type = org.apache.flume.MySource
Agent Name = a1
Source Name = r1
Flume Configuration = a1.sources = r1

And I was getting the output in nifi/logs/nifi-bootstrap.log

Keep in mind that this could become risky because any classes found in the
lib directory would be accessible to all NARs in NiFi and would be found
before classes within a NAR because the parent is checked first during
class loading. This example isn't too risky because we are only bringing in
flume jars and one guava jar, but for example if another nar uses a
different version of guava this is going to cause a problem.

If you plan to use NiFi for the long term, it might be worth investing in
converting your custom Flume components to NiFi processors. We can help you
get started if you need any guidance going that route.

-Bryan


On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <pa...@gmail.com>
wrote:

> Hello Bryan,
>
> Thank you very much for your response.
>
> Is it possible to have customized flume source and sink in Nifi?
> I have my own customized source and sink? I followed below steps to add my
> own customized source but it did not work.
>
> 1) Created Maven project and added customized source. (flume.jar was
> created after this step)
> 2) Added flume.jar file to nifi-0.3.0/lib folder.
> 3) Added flume source processor with the below configuration
>
> Property           Value
> Source Type         com.flume.source.Source
> Agent Name      a1
> Source Name         k1.
>
> But I am getting the below error in Flume Source Processor.
> "Failed to run validation due to java.lang.NoClassDefFoundError :
> /org/apache/flume/PollableSource."
>
> Can you please help me in this regard. Any step/configuration I missed.
>
> Thanks and Regards,
> Parul
>
>
> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com> wrote:
>
>> Hello,
>>
>> The NiFi Flume processors are for running Flume sources and sinks with in
>> NiFi. They don't communicate with an external Flume process.
>>
>> In your example you would need an ExecuteFlumeSource configured to run
>> the netcat source, connected to a ExecuteFlumeSink configured with the
>> logger.
>>
>> -Bryan
>>
>> On Wednesday, October 7, 2015, Parul Agrawal <pa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I was trying to run Nifi Flume processor with the below mentioned
>>> details but not could bring it up.
>>>
>>> I already started flume with the sample configuration file
>>> =============================================
>>> # example.conf: A single-node Flume configuration
>>>
>>> # Name the components on this agent
>>> a1.sources = r1
>>> a1.sinks = k1
>>> a1.channels = c1
>>>
>>> # Describe/configure the source
>>> a1.sources.r1.type = netcat
>>> a1.sources.r1.bind = localhost
>>> a1.sources.r1.port = 44444
>>>
>>> # Describe the sink
>>> a1.sinks.k1.type = logger
>>>
>>> # Use a channel which buffers events in memory
>>> a1.channels.c1.type = memory
>>> a1.channels.c1.capacity = 1000
>>> a1.channels.c1.transactionCapacity = 100
>>>
>>> # Bind the source and sink to the channel
>>> a1.sources.r1.channels = c1
>>> a1.sinks.k1.channel = c1
>>> =============================================
>>>
>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>> --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
>>>
>>> In the Nifi browser of ExecuteFlumeSink following configuration was done:
>>> Property           Value
>>> Sink Type         logger
>>> Agent Name      a1
>>> Sink Name         k1.
>>>
>>> Event is sent to the flume using:
>>> $ telnet localhost 44444
>>> Trying 127.0.0.1...
>>> Connected to localhost.localdomain (127.0.0.1).
>>> Escape character is '^]'.
>>> Hello world! <ENTER>
>>> OK
>>>
>>> But I could not get any data in the nifi flume processor. Request your
>>> help in this.
>>> Do i need to change the example.conf file of flume so that Nifi Flume
>>> Sink should get the data.
>>>
>>> Thanks and Regards,
>>> Parul
>>>
>>
>>
>> --
>> Sent from Gmail Mobile
>>
>
>

Re: Need help in nifi- flume processor

Posted by Parul Agrawal <pa...@gmail.com>.
Hello Bryan,

Thank you very much for your response.

Is it possible to have customized flume source and sink in Nifi?
I have my own customized source and sink? I followed below steps to add my
own customized source but it did not work.

1) Created Maven project and added customized source. (flume.jar was
created after this step)
2) Added flume.jar file to nifi-0.3.0/lib folder.
3) Added flume source processor with the below configuration

Property           Value
Source Type         com.flume.source.Source
Agent Name      a1
Source Name         k1.

But I am getting the below error in Flume Source Processor.
"Failed to run validation due to java.lang.NoClassDefFoundError :
/org/apache/flume/PollableSource."

Can you please help me in this regard. Any step/configuration I missed.

Thanks and Regards,
Parul


On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bb...@gmail.com> wrote:

> Hello,
>
> The NiFi Flume processors are for running Flume sources and sinks with in
> NiFi. They don't communicate with an external Flume process.
>
> In your example you would need an ExecuteFlumeSource configured to run the
> netcat source, connected to a ExecuteFlumeSink configured with the logger.
>
> -Bryan
>
> On Wednesday, October 7, 2015, Parul Agrawal <pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I was trying to run Nifi Flume processor with the below mentioned
>> details but not could bring it up.
>>
>> I already started flume with the sample configuration file
>> =============================================
>> # example.conf: A single-node Flume configuration
>>
>> # Name the components on this agent
>> a1.sources = r1
>> a1.sinks = k1
>> a1.channels = c1
>>
>> # Describe/configure the source
>> a1.sources.r1.type = netcat
>> a1.sources.r1.bind = localhost
>> a1.sources.r1.port = 44444
>>
>> # Describe the sink
>> a1.sinks.k1.type = logger
>>
>> # Use a channel which buffers events in memory
>> a1.channels.c1.type = memory
>> a1.channels.c1.capacity = 1000
>> a1.channels.c1.transactionCapacity = 100
>>
>> # Bind the source and sink to the channel
>> a1.sources.r1.channels = c1
>> a1.sinks.k1.channel = c1
>> =============================================
>>
>> Command used to start flume : $ bin/flume-ng agent --conf conf
>> --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
>>
>> In the Nifi browser of ExecuteFlumeSink following configuration was done:
>> Property           Value
>> Sink Type         logger
>> Agent Name      a1
>> Sink Name         k1.
>>
>> Event is sent to the flume using:
>> $ telnet localhost 44444
>> Trying 127.0.0.1...
>> Connected to localhost.localdomain (127.0.0.1).
>> Escape character is '^]'.
>> Hello world! <ENTER>
>> OK
>>
>> But I could not get any data in the nifi flume processor. Request your
>> help in this.
>> Do i need to change the example.conf file of flume so that Nifi Flume
>> Sink should get the data.
>>
>> Thanks and Regards,
>> Parul
>>
>
>
> --
> Sent from Gmail Mobile
>

Re: Need help in nifi- flume processor

Posted by Bryan Bende <bb...@gmail.com>.
Hello,

The NiFi Flume processors are for running Flume sources and sinks with in
NiFi. They don't communicate with an external Flume process.

In your example you would need an ExecuteFlumeSource configured to run the
netcat source, connected to a ExecuteFlumeSink configured with the logger.

-Bryan

On Wednesday, October 7, 2015, Parul Agrawal <pa...@gmail.com>
wrote:

> Hi,
>
> I was trying to run Nifi Flume processor with the below mentioned
> details but not could bring it up.
>
> I already started flume with the sample configuration file
> =============================================
> # example.conf: A single-node Flume configuration
>
> # Name the components on this agent
> a1.sources = r1
> a1.sinks = k1
> a1.channels = c1
>
> # Describe/configure the source
> a1.sources.r1.type = netcat
> a1.sources.r1.bind = localhost
> a1.sources.r1.port = 44444
>
> # Describe the sink
> a1.sinks.k1.type = logger
>
> # Use a channel which buffers events in memory
> a1.channels.c1.type = memory
> a1.channels.c1.capacity = 1000
> a1.channels.c1.transactionCapacity = 100
>
> # Bind the source and sink to the channel
> a1.sources.r1.channels = c1
> a1.sinks.k1.channel = c1
> =============================================
>
> Command used to start flume : $ bin/flume-ng agent --conf conf
> --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
>
> In the Nifi browser of ExecuteFlumeSink following configuration was done:
> Property           Value
> Sink Type         logger
> Agent Name      a1
> Sink Name         k1.
>
> Event is sent to the flume using:
> $ telnet localhost 44444
> Trying 127.0.0.1...
> Connected to localhost.localdomain (127.0.0.1).
> Escape character is '^]'.
> Hello world! <ENTER>
> OK
>
> But I could not get any data in the nifi flume processor. Request your
> help in this.
> Do i need to change the example.conf file of flume so that Nifi Flume
> Sink should get the data.
>
> Thanks and Regards,
> Parul
>


-- 
Sent from Gmail Mobile