You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Priyanka jain <ja...@gmail.com> on 2012/12/26 18:39:17 UTC
flume-ng cassandra sink problem..
HI,
I am working on the POC of Cassandra flume integration. For that am
using Cassandra sink plugin from *Github (flume-ng Cassandra sink plugin).*
*And *
*Flume-NG version-1.2.0*
*Apache Cassandra Version :1.1.5*
*I *have build the jar using maven and am using sink configuration as below
in flume.conf.cassandra in conf directory...
*agent.sources = avrosource*
*agent.channels = channel1*
*agent.sinks = cassandraSink*
* *
*#source defination*
*agent.sources.avrosource.channels = channel1*
*agent.sources.avrosource.type = exec*
*agent.sources.avrosource.command = tail -f
/home/user/priyanka/flume-ng/flnginput.txt*
* *
*#agent.sources.avrosource.type = avro*
*#agent.sources.avrosource.channels = channel1*
*#agent.sources.avrosource.bind =127.0.0.1*
*#agent.sources.avrosource.port =41414*
* *
*#Flume header event*
*agent.sources.avrosource.interceptors = addHost*
*agent.sources.avrosource.interceptors.addHost.type =
org.apache.flume.interceptor.HostInterceptor$Builder*
*agent.sources.avrosource.interceptors.addHost.preserveExisting = false*
*agent.sources.avrosource.interceptors.addHost.useIP = false*
*agent.sources.avrosource.interceptors.addHost.hostHeader = host*
*agent.sources.avrosource.interceptors = addTimestamp*
*agent.sources.avrosource.interceptors.addTimestamp.type =
org.apache.flume.interceptor.TimestampInterceptor$Builder*
* *
*# Cassandra flow*
*agent.channels.channel1.type = FILE*
*agent.channels.channel1.checkpointDir = file-channel1/check*
*agent.channels.channel1.dataDirs = file-channel1/data*
* *
*agent.sinks.cassandraSink.channel = channel1*
*agent.sinks.cassandraSink.type =
com.btoddb.flume.sinks.cassandra.CassandraSink*
*agent.sinks.cassandraSink.hosts = localhost*
*agent.sinks.cassandraSink.port = 9160*
*agent.sinks.cassandraSink.keyspace-name = logs*
*agent.sinks.cassandraSink.records-colfam = records*
*
*
*
*
*
*
*Am running this using the command :-*
*
*
flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
/usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
-Dflume.root.logger=DEBUG,console
Got the error while running :-
* *
2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
exception while processing in Cassandra Sink
java.lang.IllegalArgumentException: Missing flume header attribute, 'key' -
cannot process this event
at
com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
atjava.lang.Thread.run(Thread.java:722)
2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to persist message
at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
atjava.lang.Thread.run(Thread.java:722)
Caused by: java.lang.IllegalArgumentException: Missing flume header
attribute, 'key' - cannot process this event
at
com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
... 3 more
I got one solution as Key is Src+Key but am not getting how to configure it.
So can any one please help me out to solve this problem.
So
Re: flume-ng cassandra sink problem..
Posted by Priyanka Jain <ja...@gmail.com>.
On Thu, Dec 27, 2012 at 12:22 PM, Priyanka Jain
<ja...@gmail.com>wrote:
> Hi ,
>
> Sending the some log for the error.In attached file.
>
> On Wed, Dec 26, 2012 at 11:50 PM, Nitin Pawar <ni...@gmail.com>wrote:
>
>> can you provide sample line out of your log?
>>
>> the cassandra sink you are using looks at a particular event headers in
>> your log such as key, src and host
>>
>>
>> On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain <
>> jainpriyanka1110@gmail.com> wrote:
>>
>>> Hi Nitin,
>>> Thanks for your suggestion.
>>>
>>> I have done all the thing as in README. but am not getting from where I
>>> can set that key.
>>> can you please give me idea about from where I can configure it or from
>>> where its get generated.
>>>
>>>
>>> On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <ni...@gmail.com>wrote:
>>>
>>>> from the README
>>>> you need to have following things in conf
>>>>
>>>> The Sink expects several flume event headers to be present:
>>>>
>>>> - key - used (combined with src) to create the Cassandra row key.
>>>> It should be generated by the application doing the logging
>>>> - timestamp - timestamp of when the log occurred, not necessarily
>>>> when the flume event is created
>>>> - src - A logical source of the flume event. Could be host, but
>>>> probably you will have many hosts for a source. A more likely candidate for
>>>> source is the name of the application
>>>> - host - the name of the host where the message was generated
>>>> -
>>>>
>>>>
>>>>
>>>> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <
>>>> jainpriyanka1110@gmail.com> wrote:
>>>>
>>>>> HI,
>>>>>
>>>>> I am working on the POC of Cassandra flume integration. For that am
>>>>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink
>>>>> plugin).*
>>>>> *And *
>>>>> *Flume-NG version-1.2.0*
>>>>> *Apache Cassandra Version :1.1.5*
>>>>> *I *have build the jar using maven and am using sink configuration as
>>>>> below in flume.conf.cassandra in conf directory...
>>>>>
>>>>> *agent.sources = avrosource*
>>>>>
>>>>> *agent.channels = channel1*
>>>>>
>>>>> *agent.sinks = cassandraSink*
>>>>>
>>>>> * *
>>>>>
>>>>> *#source defination*
>>>>>
>>>>> *agent.sources.avrosource.channels = channel1*
>>>>>
>>>>> *agent.sources.avrosource.type = exec*
>>>>>
>>>>> *agent.sources.avrosource.command = tail -f
>>>>> /home/user/priyanka/flume-ng/flnginput.txt*
>>>>>
>>>>> * *
>>>>>
>>>>> *#agent.sources.avrosource.type = avro*
>>>>>
>>>>> *#agent.sources.avrosource.channels = channel1*
>>>>>
>>>>> *#agent.sources.avrosource.bind =127.0.0.1*
>>>>>
>>>>> *#agent.sources.avrosource.port =41414*
>>>>>
>>>>> * *
>>>>>
>>>>> *#Flume header event*
>>>>>
>>>>> *agent.sources.avrosource.interceptors = addHost*
>>>>>
>>>>> *agent.sources.avrosource.interceptors.addHost.type =
>>>>> org.apache.flume.interceptor.HostInterceptor$Builder*
>>>>>
>>>>> *agent.sources.avrosource.interceptors.addHost.preserveExisting =
>>>>> false*
>>>>>
>>>>> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>>>>>
>>>>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>>>>>
>>>>> *agent.sources.avrosource.interceptors = addTimestamp*
>>>>>
>>>>> *agent.sources.avrosource.interceptors.addTimestamp.type =
>>>>> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>>>>>
>>>>> * *
>>>>>
>>>>> *# Cassandra flow*
>>>>>
>>>>> *agent.channels.channel1.type = FILE*
>>>>>
>>>>> *agent.channels.channel1.checkpointDir = file-channel1/check*
>>>>>
>>>>> *agent.channels.channel1.dataDirs = file-channel1/data*
>>>>>
>>>>> * *
>>>>>
>>>>> *agent.sinks.cassandraSink.channel = channel1*
>>>>>
>>>>> *agent.sinks.cassandraSink.type =
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSink*
>>>>>
>>>>> *agent.sinks.cassandraSink.hosts = localhost*
>>>>>
>>>>> *agent.sinks.cassandraSink.port = 9160*
>>>>>
>>>>> *agent.sinks.cassandraSink.keyspace-name = logs*
>>>>>
>>>>> *agent.sinks.cassandraSink.records-colfam = records*
>>>>>
>>>>> *
>>>>> *
>>>>>
>>>>> *
>>>>> *
>>>>>
>>>>> *
>>>>> *
>>>>>
>>>>> *Am running this using the command :-*
>>>>>
>>>>> *
>>>>> *
>>>>>
>>>>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
>>>>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
>>>>> -Dflume.root.logger=DEBUG,console
>>>>>
>>>>>
>>>>> Got the error while running :-
>>>>>
>>>>> * *
>>>>>
>>>>> 2012-12-21 14:37:07,743
>>>>> (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR -
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
>>>>> exception while processing in Cassandra Sink
>>>>>
>>>>> java.lang.IllegalArgumentException: Missing flume header attribute,
>>>>> 'key' - cannot process this event
>>>>>
>>>>> at
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>>>
>>>>> at
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>>>
>>>>> at
>>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>>
>>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>>
>>>>> atjava.lang.Thread.run(Thread.java:722)
>>>>>
>>>>> 2012-12-21 14:37:07,743
>>>>> (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR -
>>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
>>>>> to deliver event. Exception follows.
>>>>>
>>>>> org.apache.flume.EventDeliveryException: Failed to persist message
>>>>>
>>>>> at
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
>>>>>
>>>>> at
>>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>>
>>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>>
>>>>> atjava.lang.Thread.run(Thread.java:722)
>>>>>
>>>>> Caused by: java.lang.IllegalArgumentException: Missing flume header
>>>>> attribute, 'key' - cannot process this event
>>>>>
>>>>> at
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>>>
>>>>> at
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>>>
>>>>> ... 3 more
>>>>>
>>>>> I got one solution as Key is Src+Key but am not getting how to
>>>>> configure it.
>>>>> So can any one please help me out to solve this problem.
>>>>> So
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Nitin Pawar
>>>>
>>>
>>>
>>
>>
>> --
>> Nitin Pawar
>>
>
>
Re: flume-ng cassandra sink problem..
Posted by Priyanka Jain <ja...@gmail.com>.
Hi ,
On Wed, Dec 26, 2012 at 11:50 PM, Nitin Pawar <ni...@gmail.com>wrote:
> can you provide sample line out of your log?
>
> the cassandra sink you are using looks at a particular event headers in
> your log such as key, src and host
>
>
> On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain <
> jainpriyanka1110@gmail.com> wrote:
>
>> Hi Nitin,
>> Thanks for your suggestion.
>>
>> I have done all the thing as in README. but am not getting from where I
>> can set that key.
>> can you please give me idea about from where I can configure it or from
>> where its get generated.
>>
>>
>> On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <ni...@gmail.com>wrote:
>>
>>> from the README
>>> you need to have following things in conf
>>>
>>> The Sink expects several flume event headers to be present:
>>>
>>> - key - used (combined with src) to create the Cassandra row key. It
>>> should be generated by the application doing the logging
>>> - timestamp - timestamp of when the log occurred, not necessarily
>>> when the flume event is created
>>> - src - A logical source of the flume event. Could be host, but
>>> probably you will have many hosts for a source. A more likely candidate for
>>> source is the name of the application
>>> - host - the name of the host where the message was generated
>>> -
>>>
>>>
>>>
>>> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <
>>> jainpriyanka1110@gmail.com> wrote:
>>>
>>>> HI,
>>>>
>>>> I am working on the POC of Cassandra flume integration. For that am
>>>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink
>>>> plugin).*
>>>> *And *
>>>> *Flume-NG version-1.2.0*
>>>> *Apache Cassandra Version :1.1.5*
>>>> *I *have build the jar using maven and am using sink configuration as
>>>> below in flume.conf.cassandra in conf directory...
>>>>
>>>> *agent.sources = avrosource*
>>>>
>>>> *agent.channels = channel1*
>>>>
>>>> *agent.sinks = cassandraSink*
>>>>
>>>> * *
>>>>
>>>> *#source defination*
>>>>
>>>> *agent.sources.avrosource.channels = channel1*
>>>>
>>>> *agent.sources.avrosource.type = exec*
>>>>
>>>> *agent.sources.avrosource.command = tail -f
>>>> /home/user/priyanka/flume-ng/flnginput.txt*
>>>>
>>>> * *
>>>>
>>>> *#agent.sources.avrosource.type = avro*
>>>>
>>>> *#agent.sources.avrosource.channels = channel1*
>>>>
>>>> *#agent.sources.avrosource.bind =127.0.0.1*
>>>>
>>>> *#agent.sources.avrosource.port =41414*
>>>>
>>>> * *
>>>>
>>>> *#Flume header event*
>>>>
>>>> *agent.sources.avrosource.interceptors = addHost*
>>>>
>>>> *agent.sources.avrosource.interceptors.addHost.type =
>>>> org.apache.flume.interceptor.HostInterceptor$Builder*
>>>>
>>>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false
>>>> *
>>>>
>>>> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>>>>
>>>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>>>>
>>>> *agent.sources.avrosource.interceptors = addTimestamp*
>>>>
>>>> *agent.sources.avrosource.interceptors.addTimestamp.type =
>>>> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>>>>
>>>> * *
>>>>
>>>> *# Cassandra flow*
>>>>
>>>> *agent.channels.channel1.type = FILE*
>>>>
>>>> *agent.channels.channel1.checkpointDir = file-channel1/check*
>>>>
>>>> *agent.channels.channel1.dataDirs = file-channel1/data*
>>>>
>>>> * *
>>>>
>>>> *agent.sinks.cassandraSink.channel = channel1*
>>>>
>>>> *agent.sinks.cassandraSink.type =
>>>> com.btoddb.flume.sinks.cassandra.CassandraSink*
>>>>
>>>> *agent.sinks.cassandraSink.hosts = localhost*
>>>>
>>>> *agent.sinks.cassandraSink.port = 9160*
>>>>
>>>> *agent.sinks.cassandraSink.keyspace-name = logs*
>>>>
>>>> *agent.sinks.cassandraSink.records-colfam = records*
>>>>
>>>> *
>>>> *
>>>>
>>>> *
>>>> *
>>>>
>>>> *
>>>> *
>>>>
>>>> *Am running this using the command :-*
>>>>
>>>> *
>>>> *
>>>>
>>>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
>>>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
>>>> -Dflume.root.logger=DEBUG,console
>>>>
>>>>
>>>> Got the error while running :-
>>>>
>>>> * *
>>>>
>>>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>>>> [ERROR -
>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
>>>> exception while processing in Cassandra Sink
>>>>
>>>> java.lang.IllegalArgumentException: Missing flume header attribute,
>>>> 'key' - cannot process this event
>>>>
>>>> at
>>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>>
>>>> at
>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>>
>>>> at
>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>
>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>
>>>> atjava.lang.Thread.run(Thread.java:722)
>>>>
>>>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>>>> [ERROR -
>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
>>>> to deliver event. Exception follows.
>>>>
>>>> org.apache.flume.EventDeliveryException: Failed to persist message
>>>>
>>>> at
>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
>>>>
>>>> at
>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>
>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>
>>>> atjava.lang.Thread.run(Thread.java:722)
>>>>
>>>> Caused by: java.lang.IllegalArgumentException: Missing flume header
>>>> attribute, 'key' - cannot process this event
>>>>
>>>> at
>>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>>
>>>> at
>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>>
>>>> ... 3 more
>>>>
>>>> I got one solution as Key is Src+Key but am not getting how to
>>>> configure it.
>>>> So can any one please help me out to solve this problem.
>>>> So
>>>>
>>>>
>>>
>>>
>>> --
>>> Nitin Pawar
>>>
>>
>>
>
>
> --
> Nitin Pawar
>
Re: flume-ng cassandra sink problem..
Posted by Nitin Pawar <ni...@gmail.com>.
can you provide sample line out of your log?
the cassandra sink you are using looks at a particular event headers in
your log such as key, src and host
On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain
<ja...@gmail.com>wrote:
> Hi Nitin,
> Thanks for your suggestion.
>
> I have done all the thing as in README. but am not getting from where I
> can set that key.
> can you please give me idea about from where I can configure it or from
> where its get generated.
>
>
> On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <ni...@gmail.com>wrote:
>
>> from the README
>> you need to have following things in conf
>>
>> The Sink expects several flume event headers to be present:
>>
>> - key - used (combined with src) to create the Cassandra row key. It
>> should be generated by the application doing the logging
>> - timestamp - timestamp of when the log occurred, not necessarily
>> when the flume event is created
>> - src - A logical source of the flume event. Could be host, but
>> probably you will have many hosts for a source. A more likely candidate for
>> source is the name of the application
>> - host - the name of the host where the message was generated
>> -
>>
>>
>>
>> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <
>> jainpriyanka1110@gmail.com> wrote:
>>
>>> HI,
>>>
>>> I am working on the POC of Cassandra flume integration. For that am
>>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink
>>> plugin).*
>>> *And *
>>> *Flume-NG version-1.2.0*
>>> *Apache Cassandra Version :1.1.5*
>>> *I *have build the jar using maven and am using sink configuration as
>>> below in flume.conf.cassandra in conf directory...
>>>
>>> *agent.sources = avrosource*
>>>
>>> *agent.channels = channel1*
>>>
>>> *agent.sinks = cassandraSink*
>>>
>>> * *
>>>
>>> *#source defination*
>>>
>>> *agent.sources.avrosource.channels = channel1*
>>>
>>> *agent.sources.avrosource.type = exec*
>>>
>>> *agent.sources.avrosource.command = tail -f
>>> /home/user/priyanka/flume-ng/flnginput.txt*
>>>
>>> * *
>>>
>>> *#agent.sources.avrosource.type = avro*
>>>
>>> *#agent.sources.avrosource.channels = channel1*
>>>
>>> *#agent.sources.avrosource.bind =127.0.0.1*
>>>
>>> *#agent.sources.avrosource.port =41414*
>>>
>>> * *
>>>
>>> *#Flume header event*
>>>
>>> *agent.sources.avrosource.interceptors = addHost*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.type =
>>> org.apache.flume.interceptor.HostInterceptor$Builder*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>>>
>>> *agent.sources.avrosource.interceptors = addTimestamp*
>>>
>>> *agent.sources.avrosource.interceptors.addTimestamp.type =
>>> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>>>
>>> * *
>>>
>>> *# Cassandra flow*
>>>
>>> *agent.channels.channel1.type = FILE*
>>>
>>> *agent.channels.channel1.checkpointDir = file-channel1/check*
>>>
>>> *agent.channels.channel1.dataDirs = file-channel1/data*
>>>
>>> * *
>>>
>>> *agent.sinks.cassandraSink.channel = channel1*
>>>
>>> *agent.sinks.cassandraSink.type =
>>> com.btoddb.flume.sinks.cassandra.CassandraSink*
>>>
>>> *agent.sinks.cassandraSink.hosts = localhost*
>>>
>>> *agent.sinks.cassandraSink.port = 9160*
>>>
>>> *agent.sinks.cassandraSink.keyspace-name = logs*
>>>
>>> *agent.sinks.cassandraSink.records-colfam = records*
>>>
>>> *
>>> *
>>>
>>> *
>>> *
>>>
>>> *
>>> *
>>>
>>> *Am running this using the command :-*
>>>
>>> *
>>> *
>>>
>>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
>>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
>>> -Dflume.root.logger=DEBUG,console
>>>
>>>
>>> Got the error while running :-
>>>
>>> * *
>>>
>>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>>> [ERROR -
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
>>> exception while processing in Cassandra Sink
>>>
>>> java.lang.IllegalArgumentException: Missing flume header attribute,
>>> 'key' - cannot process this event
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>
>>> at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> atjava.lang.Thread.run(Thread.java:722)
>>>
>>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>>> [ERROR -
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
>>> to deliver event. Exception follows.
>>>
>>> org.apache.flume.EventDeliveryException: Failed to persist message
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
>>>
>>> at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> atjava.lang.Thread.run(Thread.java:722)
>>>
>>> Caused by: java.lang.IllegalArgumentException: Missing flume header
>>> attribute, 'key' - cannot process this event
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>
>>> ... 3 more
>>>
>>> I got one solution as Key is Src+Key but am not getting how to configure
>>> it.
>>> So can any one please help me out to solve this problem.
>>> So
>>>
>>>
>>
>>
>> --
>> Nitin Pawar
>>
>
>
--
Nitin Pawar
Re: flume-ng cassandra sink problem..
Posted by Priyanka jain <ja...@gmail.com>.
Hi Nitin,
Thanks for your suggestion.
I have done all the thing as in README. but am not getting from where I can
set that key.
can you please give me idea about from where I can configure it or from
where its get generated.
On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <ni...@gmail.com>wrote:
> from the README
> you need to have following things in conf
>
> The Sink expects several flume event headers to be present:
>
> - key - used (combined with src) to create the Cassandra row key. It
> should be generated by the application doing the logging
> - timestamp - timestamp of when the log occurred, not necessarily when
> the flume event is created
> - src - A logical source of the flume event. Could be host, but
> probably you will have many hosts for a source. A more likely candidate for
> source is the name of the application
> - host - the name of the host where the message was generated
> -
>
>
>
> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <
> jainpriyanka1110@gmail.com> wrote:
>
>> HI,
>>
>> I am working on the POC of Cassandra flume integration. For that am
>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink
>> plugin).*
>> *And *
>> *Flume-NG version-1.2.0*
>> *Apache Cassandra Version :1.1.5*
>> *I *have build the jar using maven and am using sink configuration as
>> below in flume.conf.cassandra in conf directory...
>>
>> *agent.sources = avrosource*
>>
>> *agent.channels = channel1*
>>
>> *agent.sinks = cassandraSink*
>>
>> * *
>>
>> *#source defination*
>>
>> *agent.sources.avrosource.channels = channel1*
>>
>> *agent.sources.avrosource.type = exec*
>>
>> *agent.sources.avrosource.command = tail -f
>> /home/user/priyanka/flume-ng/flnginput.txt*
>>
>> * *
>>
>> *#agent.sources.avrosource.type = avro*
>>
>> *#agent.sources.avrosource.channels = channel1*
>>
>> *#agent.sources.avrosource.bind =127.0.0.1*
>>
>> *#agent.sources.avrosource.port =41414*
>>
>> * *
>>
>> *#Flume header event*
>>
>> *agent.sources.avrosource.interceptors = addHost*
>>
>> *agent.sources.avrosource.interceptors.addHost.type =
>> org.apache.flume.interceptor.HostInterceptor$Builder*
>>
>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false*
>>
>> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>>
>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>>
>> *agent.sources.avrosource.interceptors = addTimestamp*
>>
>> *agent.sources.avrosource.interceptors.addTimestamp.type =
>> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>>
>> * *
>>
>> *# Cassandra flow*
>>
>> *agent.channels.channel1.type = FILE*
>>
>> *agent.channels.channel1.checkpointDir = file-channel1/check*
>>
>> *agent.channels.channel1.dataDirs = file-channel1/data*
>>
>> * *
>>
>> *agent.sinks.cassandraSink.channel = channel1*
>>
>> *agent.sinks.cassandraSink.type =
>> com.btoddb.flume.sinks.cassandra.CassandraSink*
>>
>> *agent.sinks.cassandraSink.hosts = localhost*
>>
>> *agent.sinks.cassandraSink.port = 9160*
>>
>> *agent.sinks.cassandraSink.keyspace-name = logs*
>>
>> *agent.sinks.cassandraSink.records-colfam = records*
>>
>> *
>> *
>>
>> *
>> *
>>
>> *
>> *
>>
>> *Am running this using the command :-*
>>
>> *
>> *
>>
>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
>> -Dflume.root.logger=DEBUG,console
>>
>>
>> Got the error while running :-
>>
>> * *
>>
>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>> [ERROR -
>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
>> exception while processing in Cassandra Sink
>>
>> java.lang.IllegalArgumentException: Missing flume header attribute, 'key'
>> - cannot process this event
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>
>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>
>> atjava.lang.Thread.run(Thread.java:722)
>>
>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>> [ERROR -
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
>> to deliver event. Exception follows.
>>
>> org.apache.flume.EventDeliveryException: Failed to persist message
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>
>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>
>> atjava.lang.Thread.run(Thread.java:722)
>>
>> Caused by: java.lang.IllegalArgumentException: Missing flume header
>> attribute, 'key' - cannot process this event
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>
>> ... 3 more
>>
>> I got one solution as Key is Src+Key but am not getting how to configure
>> it.
>> So can any one please help me out to solve this problem.
>> So
>>
>>
>
>
> --
> Nitin Pawar
>
Re: flume-ng cassandra sink problem..
Posted by Nitin Pawar <ni...@gmail.com>.
from the README
you need to have following things in conf
The Sink expects several flume event headers to be present:
- key - used (combined with src) to create the Cassandra row key. It
should be generated by the application doing the logging
- timestamp - timestamp of when the log occurred, not necessarily when
the flume event is created
- src - A logical source of the flume event. Could be host, but probably
you will have many hosts for a source. A more likely candidate for source
is the name of the application
- host - the name of the host where the message was generated
-
On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain
<ja...@gmail.com>wrote:
> HI,
>
> I am working on the POC of Cassandra flume integration. For that am
> using Cassandra sink plugin from *Github (flume-ng Cassandra sink plugin).
> *
> *And *
> *Flume-NG version-1.2.0*
> *Apache Cassandra Version :1.1.5*
> *I *have build the jar using maven and am using sink configuration as
> below in flume.conf.cassandra in conf directory...
>
> *agent.sources = avrosource*
>
> *agent.channels = channel1*
>
> *agent.sinks = cassandraSink*
>
> * *
>
> *#source defination*
>
> *agent.sources.avrosource.channels = channel1*
>
> *agent.sources.avrosource.type = exec*
>
> *agent.sources.avrosource.command = tail -f
> /home/user/priyanka/flume-ng/flnginput.txt*
>
> * *
>
> *#agent.sources.avrosource.type = avro*
>
> *#agent.sources.avrosource.channels = channel1*
>
> *#agent.sources.avrosource.bind =127.0.0.1*
>
> *#agent.sources.avrosource.port =41414*
>
> * *
>
> *#Flume header event*
>
> *agent.sources.avrosource.interceptors = addHost*
>
> *agent.sources.avrosource.interceptors.addHost.type =
> org.apache.flume.interceptor.HostInterceptor$Builder*
>
> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false*
>
> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>
> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>
> *agent.sources.avrosource.interceptors = addTimestamp*
>
> *agent.sources.avrosource.interceptors.addTimestamp.type =
> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>
> * *
>
> *# Cassandra flow*
>
> *agent.channels.channel1.type = FILE*
>
> *agent.channels.channel1.checkpointDir = file-channel1/check*
>
> *agent.channels.channel1.dataDirs = file-channel1/data*
>
> * *
>
> *agent.sinks.cassandraSink.channel = channel1*
>
> *agent.sinks.cassandraSink.type =
> com.btoddb.flume.sinks.cassandra.CassandraSink*
>
> *agent.sinks.cassandraSink.hosts = localhost*
>
> *agent.sinks.cassandraSink.port = 9160*
>
> *agent.sinks.cassandraSink.keyspace-name = logs*
>
> *agent.sinks.cassandraSink.records-colfam = records*
>
> *
> *
>
> *
> *
>
> *
> *
>
> *Am running this using the command :-*
>
> *
> *
>
> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
> -Dflume.root.logger=DEBUG,console
>
>
> Got the error while running :-
>
> * *
>
> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
> exception while processing in Cassandra Sink
>
> java.lang.IllegalArgumentException: Missing flume header attribute, 'key'
> - cannot process this event
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
> atjava.lang.Thread.run(Thread.java:722)
>
> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
> to deliver event. Exception follows.
>
> org.apache.flume.EventDeliveryException: Failed to persist message
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
> atjava.lang.Thread.run(Thread.java:722)
>
> Caused by: java.lang.IllegalArgumentException: Missing flume header
> attribute, 'key' - cannot process this event
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>
> ... 3 more
>
> I got one solution as Key is Src+Key but am not getting how to configure
> it.
> So can any one please help me out to solve this problem.
> So
>
>
--
Nitin Pawar