You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Priyanka jain <ja...@gmail.com> on 2012/12/26 18:39:17 UTC

flume-ng cassandra sink problem..

HI,

I am working on the POC of Cassandra flume integration. For that am
using Cassandra sink plugin from *Github (flume-ng Cassandra sink plugin).*
*And *
*Flume-NG version-1.2.0*
*Apache Cassandra Version :1.1.5*
*I *have build the jar using maven and am using sink configuration as below
in flume.conf.cassandra in conf directory...

*agent.sources = avrosource*

*agent.channels = channel1*

*agent.sinks = cassandraSink*

* *

*#source defination*

*agent.sources.avrosource.channels = channel1*

*agent.sources.avrosource.type = exec*

*agent.sources.avrosource.command = tail -f
/home/user/priyanka/flume-ng/flnginput.txt*

* *

*#agent.sources.avrosource.type = avro*

*#agent.sources.avrosource.channels = channel1*

*#agent.sources.avrosource.bind =127.0.0.1*

*#agent.sources.avrosource.port =41414*

* *

*#Flume header event*

*agent.sources.avrosource.interceptors = addHost*

*agent.sources.avrosource.interceptors.addHost.type =
org.apache.flume.interceptor.HostInterceptor$Builder*

*agent.sources.avrosource.interceptors.addHost.preserveExisting = false*

*agent.sources.avrosource.interceptors.addHost.useIP = false*

*agent.sources.avrosource.interceptors.addHost.hostHeader = host*

*agent.sources.avrosource.interceptors = addTimestamp*

*agent.sources.avrosource.interceptors.addTimestamp.type =
org.apache.flume.interceptor.TimestampInterceptor$Builder*

* *

*# Cassandra flow*

*agent.channels.channel1.type = FILE*

*agent.channels.channel1.checkpointDir = file-channel1/check*

*agent.channels.channel1.dataDirs = file-channel1/data*

* *

*agent.sinks.cassandraSink.channel = channel1*

*agent.sinks.cassandraSink.type =
com.btoddb.flume.sinks.cassandra.CassandraSink*

*agent.sinks.cassandraSink.hosts = localhost*

*agent.sinks.cassandraSink.port = 9160*

*agent.sinks.cassandraSink.keyspace-name  = logs*

*agent.sinks.cassandraSink.records-colfam  = records*

*
*

*
*

*
*

*Am running this using the command :-*

*
*

flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
/usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
-Dflume.root.logger=DEBUG,console


Got the error while running :-

* *

2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
exception while processing in Cassandra Sink

java.lang.IllegalArgumentException: Missing flume header attribute, 'key' -
cannot process this event

at
com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)

at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)

at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

atjava.lang.Thread.run(Thread.java:722)

2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
to deliver event. Exception follows.

org.apache.flume.EventDeliveryException: Failed to persist message

at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)

at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

atjava.lang.Thread.run(Thread.java:722)

Caused by: java.lang.IllegalArgumentException: Missing flume header
attribute, 'key' - cannot process this event

at
com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)

at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)

        ... 3 more

I got one solution as Key is Src+Key but am not getting how to configure it.
So can any one please help me out to solve this problem.
So

Re: flume-ng cassandra sink problem..

Posted by Priyanka Jain <ja...@gmail.com>.
On Thu, Dec 27, 2012 at 12:22 PM, Priyanka Jain
<ja...@gmail.com>wrote:

>  Hi ,
>
> Sending the  some log for the error.In attached file.
>
> On Wed, Dec 26, 2012 at 11:50 PM, Nitin Pawar <ni...@gmail.com>wrote:
>
>> can you provide sample line out of your log?
>>
>> the cassandra sink you are using looks at a particular event headers in
>> your log such as key, src and host
>>
>>
>> On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain <
>> jainpriyanka1110@gmail.com> wrote:
>>
>>> Hi Nitin,
>>> Thanks for your suggestion.
>>>
>>> I have done all the thing as in README. but am not getting from where I
>>> can set that key.
>>> can you please give me idea about from where I can configure it or from
>>> where its get generated.
>>>
>>>
>>> On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <ni...@gmail.com>wrote:
>>>
>>>> from the README
>>>> you need to have following things in conf
>>>>
>>>> The Sink expects several flume event headers to be present:
>>>>
>>>>    - key - used (combined with src) to create the Cassandra row key.
>>>>    It should be generated by the application doing the logging
>>>>    - timestamp - timestamp of when the log occurred, not necessarily
>>>>    when the flume event is created
>>>>    - src - A logical source of the flume event. Could be host, but
>>>>    probably you will have many hosts for a source. A more likely candidate for
>>>>    source is the name of the application
>>>>    - host - the name of the host where the message was generated
>>>>    -
>>>>
>>>>
>>>>
>>>> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <
>>>> jainpriyanka1110@gmail.com> wrote:
>>>>
>>>>> HI,
>>>>>
>>>>> I am working on the POC of Cassandra flume integration. For that am
>>>>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink
>>>>> plugin).*
>>>>> *And *
>>>>> *Flume-NG version-1.2.0*
>>>>> *Apache Cassandra Version :1.1.5*
>>>>> *I *have build the jar using maven and am using sink configuration as
>>>>> below in flume.conf.cassandra in conf directory...
>>>>>
>>>>> *agent.sources = avrosource*
>>>>>
>>>>> *agent.channels = channel1*
>>>>>
>>>>> *agent.sinks = cassandraSink*
>>>>>
>>>>> * *
>>>>>
>>>>> *#source defination*
>>>>>
>>>>> *agent.sources.avrosource.channels = channel1*
>>>>>
>>>>> *agent.sources.avrosource.type = exec*
>>>>>
>>>>> *agent.sources.avrosource.command = tail -f
>>>>> /home/user/priyanka/flume-ng/flnginput.txt*
>>>>>
>>>>> * *
>>>>>
>>>>> *#agent.sources.avrosource.type = avro*
>>>>>
>>>>> *#agent.sources.avrosource.channels = channel1*
>>>>>
>>>>> *#agent.sources.avrosource.bind =127.0.0.1*
>>>>>
>>>>> *#agent.sources.avrosource.port =41414*
>>>>>
>>>>> * *
>>>>>
>>>>> *#Flume header event*
>>>>>
>>>>> *agent.sources.avrosource.interceptors = addHost*
>>>>>
>>>>> *agent.sources.avrosource.interceptors.addHost.type =
>>>>> org.apache.flume.interceptor.HostInterceptor$Builder*
>>>>>
>>>>> *agent.sources.avrosource.interceptors.addHost.preserveExisting =
>>>>> false*
>>>>>
>>>>> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>>>>>
>>>>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>>>>>
>>>>> *agent.sources.avrosource.interceptors = addTimestamp*
>>>>>
>>>>> *agent.sources.avrosource.interceptors.addTimestamp.type =
>>>>> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>>>>>
>>>>> * *
>>>>>
>>>>> *# Cassandra flow*
>>>>>
>>>>> *agent.channels.channel1.type = FILE*
>>>>>
>>>>> *agent.channels.channel1.checkpointDir = file-channel1/check*
>>>>>
>>>>> *agent.channels.channel1.dataDirs = file-channel1/data*
>>>>>
>>>>> * *
>>>>>
>>>>> *agent.sinks.cassandraSink.channel = channel1*
>>>>>
>>>>> *agent.sinks.cassandraSink.type =
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSink*
>>>>>
>>>>> *agent.sinks.cassandraSink.hosts = localhost*
>>>>>
>>>>> *agent.sinks.cassandraSink.port = 9160*
>>>>>
>>>>> *agent.sinks.cassandraSink.keyspace-name  = logs*
>>>>>
>>>>> *agent.sinks.cassandraSink.records-colfam  = records*
>>>>>
>>>>> *
>>>>> *
>>>>>
>>>>> *
>>>>> *
>>>>>
>>>>> *
>>>>> *
>>>>>
>>>>> *Am running this using the command :-*
>>>>>
>>>>> *
>>>>> *
>>>>>
>>>>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
>>>>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
>>>>> -Dflume.root.logger=DEBUG,console
>>>>>
>>>>>
>>>>> Got the error while running :-
>>>>>
>>>>> * *
>>>>>
>>>>> 2012-12-21 14:37:07,743
>>>>> (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR -
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
>>>>> exception while processing in Cassandra Sink
>>>>>
>>>>> java.lang.IllegalArgumentException: Missing flume header attribute,
>>>>> 'key' - cannot process this event
>>>>>
>>>>> at
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>>>
>>>>> at
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>>>
>>>>> at
>>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>>
>>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>>
>>>>> atjava.lang.Thread.run(Thread.java:722)
>>>>>
>>>>> 2012-12-21 14:37:07,743
>>>>> (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR -
>>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
>>>>> to deliver event. Exception follows.
>>>>>
>>>>> org.apache.flume.EventDeliveryException: Failed to persist message
>>>>>
>>>>> at
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
>>>>>
>>>>> at
>>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>>
>>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>>
>>>>> atjava.lang.Thread.run(Thread.java:722)
>>>>>
>>>>> Caused by: java.lang.IllegalArgumentException: Missing flume header
>>>>> attribute, 'key' - cannot process this event
>>>>>
>>>>> at
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>>>
>>>>> at
>>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>>>
>>>>>         ... 3 more
>>>>>
>>>>> I got one solution as Key is Src+Key but am not getting how to
>>>>> configure it.
>>>>> So can any one please help me out to solve this problem.
>>>>> So
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Nitin Pawar
>>>>
>>>
>>>
>>
>>
>> --
>> Nitin Pawar
>>
>
>

Re: flume-ng cassandra sink problem..

Posted by Priyanka Jain <ja...@gmail.com>.
 Hi ,





On Wed, Dec 26, 2012 at 11:50 PM, Nitin Pawar <ni...@gmail.com>wrote:

> can you provide sample line out of your log?
>
> the cassandra sink you are using looks at a particular event headers in
> your log such as key, src and host
>
>
> On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain <
> jainpriyanka1110@gmail.com> wrote:
>
>> Hi Nitin,
>> Thanks for your suggestion.
>>
>> I have done all the thing as in README. but am not getting from where I
>> can set that key.
>> can you please give me idea about from where I can configure it or from
>> where its get generated.
>>
>>
>> On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <ni...@gmail.com>wrote:
>>
>>> from the README
>>> you need to have following things in conf
>>>
>>> The Sink expects several flume event headers to be present:
>>>
>>>    - key - used (combined with src) to create the Cassandra row key. It
>>>    should be generated by the application doing the logging
>>>    - timestamp - timestamp of when the log occurred, not necessarily
>>>    when the flume event is created
>>>    - src - A logical source of the flume event. Could be host, but
>>>    probably you will have many hosts for a source. A more likely candidate for
>>>    source is the name of the application
>>>    - host - the name of the host where the message was generated
>>>    -
>>>
>>>
>>>
>>> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <
>>> jainpriyanka1110@gmail.com> wrote:
>>>
>>>> HI,
>>>>
>>>> I am working on the POC of Cassandra flume integration. For that am
>>>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink
>>>> plugin).*
>>>> *And *
>>>> *Flume-NG version-1.2.0*
>>>> *Apache Cassandra Version :1.1.5*
>>>> *I *have build the jar using maven and am using sink configuration as
>>>> below in flume.conf.cassandra in conf directory...
>>>>
>>>> *agent.sources = avrosource*
>>>>
>>>> *agent.channels = channel1*
>>>>
>>>> *agent.sinks = cassandraSink*
>>>>
>>>> * *
>>>>
>>>> *#source defination*
>>>>
>>>> *agent.sources.avrosource.channels = channel1*
>>>>
>>>> *agent.sources.avrosource.type = exec*
>>>>
>>>> *agent.sources.avrosource.command = tail -f
>>>> /home/user/priyanka/flume-ng/flnginput.txt*
>>>>
>>>> * *
>>>>
>>>> *#agent.sources.avrosource.type = avro*
>>>>
>>>> *#agent.sources.avrosource.channels = channel1*
>>>>
>>>> *#agent.sources.avrosource.bind =127.0.0.1*
>>>>
>>>> *#agent.sources.avrosource.port =41414*
>>>>
>>>> * *
>>>>
>>>> *#Flume header event*
>>>>
>>>> *agent.sources.avrosource.interceptors = addHost*
>>>>
>>>> *agent.sources.avrosource.interceptors.addHost.type =
>>>> org.apache.flume.interceptor.HostInterceptor$Builder*
>>>>
>>>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false
>>>> *
>>>>
>>>> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>>>>
>>>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>>>>
>>>> *agent.sources.avrosource.interceptors = addTimestamp*
>>>>
>>>> *agent.sources.avrosource.interceptors.addTimestamp.type =
>>>> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>>>>
>>>> * *
>>>>
>>>> *# Cassandra flow*
>>>>
>>>> *agent.channels.channel1.type = FILE*
>>>>
>>>> *agent.channels.channel1.checkpointDir = file-channel1/check*
>>>>
>>>> *agent.channels.channel1.dataDirs = file-channel1/data*
>>>>
>>>> * *
>>>>
>>>> *agent.sinks.cassandraSink.channel = channel1*
>>>>
>>>> *agent.sinks.cassandraSink.type =
>>>> com.btoddb.flume.sinks.cassandra.CassandraSink*
>>>>
>>>> *agent.sinks.cassandraSink.hosts = localhost*
>>>>
>>>> *agent.sinks.cassandraSink.port = 9160*
>>>>
>>>> *agent.sinks.cassandraSink.keyspace-name  = logs*
>>>>
>>>> *agent.sinks.cassandraSink.records-colfam  = records*
>>>>
>>>> *
>>>> *
>>>>
>>>> *
>>>> *
>>>>
>>>> *
>>>> *
>>>>
>>>> *Am running this using the command :-*
>>>>
>>>> *
>>>> *
>>>>
>>>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
>>>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
>>>> -Dflume.root.logger=DEBUG,console
>>>>
>>>>
>>>> Got the error while running :-
>>>>
>>>> * *
>>>>
>>>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>>>> [ERROR -
>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
>>>> exception while processing in Cassandra Sink
>>>>
>>>> java.lang.IllegalArgumentException: Missing flume header attribute,
>>>> 'key' - cannot process this event
>>>>
>>>> at
>>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>>
>>>> at
>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>>
>>>> at
>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>
>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>
>>>> atjava.lang.Thread.run(Thread.java:722)
>>>>
>>>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>>>> [ERROR -
>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
>>>> to deliver event. Exception follows.
>>>>
>>>> org.apache.flume.EventDeliveryException: Failed to persist message
>>>>
>>>> at
>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
>>>>
>>>> at
>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>
>>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>
>>>> atjava.lang.Thread.run(Thread.java:722)
>>>>
>>>> Caused by: java.lang.IllegalArgumentException: Missing flume header
>>>> attribute, 'key' - cannot process this event
>>>>
>>>> at
>>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>>
>>>> at
>>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>>
>>>>         ... 3 more
>>>>
>>>> I got one solution as Key is Src+Key but am not getting how to
>>>> configure it.
>>>> So can any one please help me out to solve this problem.
>>>> So
>>>>
>>>>
>>>
>>>
>>> --
>>> Nitin Pawar
>>>
>>
>>
>
>
> --
> Nitin Pawar
>

Re: flume-ng cassandra sink problem..

Posted by Nitin Pawar <ni...@gmail.com>.
can you provide sample line out of your log?

the cassandra sink you are using looks at a particular event headers in
your log such as key, src and host


On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain
<ja...@gmail.com>wrote:

> Hi Nitin,
> Thanks for your suggestion.
>
> I have done all the thing as in README. but am not getting from where I
> can set that key.
> can you please give me idea about from where I can configure it or from
> where its get generated.
>
>
> On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <ni...@gmail.com>wrote:
>
>> from the README
>> you need to have following things in conf
>>
>> The Sink expects several flume event headers to be present:
>>
>>    - key - used (combined with src) to create the Cassandra row key. It
>>    should be generated by the application doing the logging
>>    - timestamp - timestamp of when the log occurred, not necessarily
>>    when the flume event is created
>>    - src - A logical source of the flume event. Could be host, but
>>    probably you will have many hosts for a source. A more likely candidate for
>>    source is the name of the application
>>    - host - the name of the host where the message was generated
>>    -
>>
>>
>>
>> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <
>> jainpriyanka1110@gmail.com> wrote:
>>
>>> HI,
>>>
>>> I am working on the POC of Cassandra flume integration. For that am
>>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink
>>> plugin).*
>>> *And *
>>> *Flume-NG version-1.2.0*
>>> *Apache Cassandra Version :1.1.5*
>>> *I *have build the jar using maven and am using sink configuration as
>>> below in flume.conf.cassandra in conf directory...
>>>
>>> *agent.sources = avrosource*
>>>
>>> *agent.channels = channel1*
>>>
>>> *agent.sinks = cassandraSink*
>>>
>>> * *
>>>
>>> *#source defination*
>>>
>>> *agent.sources.avrosource.channels = channel1*
>>>
>>> *agent.sources.avrosource.type = exec*
>>>
>>> *agent.sources.avrosource.command = tail -f
>>> /home/user/priyanka/flume-ng/flnginput.txt*
>>>
>>> * *
>>>
>>> *#agent.sources.avrosource.type = avro*
>>>
>>> *#agent.sources.avrosource.channels = channel1*
>>>
>>> *#agent.sources.avrosource.bind =127.0.0.1*
>>>
>>> *#agent.sources.avrosource.port =41414*
>>>
>>> * *
>>>
>>> *#Flume header event*
>>>
>>> *agent.sources.avrosource.interceptors = addHost*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.type =
>>> org.apache.flume.interceptor.HostInterceptor$Builder*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>>>
>>> *agent.sources.avrosource.interceptors = addTimestamp*
>>>
>>> *agent.sources.avrosource.interceptors.addTimestamp.type =
>>> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>>>
>>> * *
>>>
>>> *# Cassandra flow*
>>>
>>> *agent.channels.channel1.type = FILE*
>>>
>>> *agent.channels.channel1.checkpointDir = file-channel1/check*
>>>
>>> *agent.channels.channel1.dataDirs = file-channel1/data*
>>>
>>> * *
>>>
>>> *agent.sinks.cassandraSink.channel = channel1*
>>>
>>> *agent.sinks.cassandraSink.type =
>>> com.btoddb.flume.sinks.cassandra.CassandraSink*
>>>
>>> *agent.sinks.cassandraSink.hosts = localhost*
>>>
>>> *agent.sinks.cassandraSink.port = 9160*
>>>
>>> *agent.sinks.cassandraSink.keyspace-name  = logs*
>>>
>>> *agent.sinks.cassandraSink.records-colfam  = records*
>>>
>>> *
>>> *
>>>
>>> *
>>> *
>>>
>>> *
>>> *
>>>
>>> *Am running this using the command :-*
>>>
>>> *
>>> *
>>>
>>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
>>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
>>> -Dflume.root.logger=DEBUG,console
>>>
>>>
>>> Got the error while running :-
>>>
>>> * *
>>>
>>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>>> [ERROR -
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
>>> exception while processing in Cassandra Sink
>>>
>>> java.lang.IllegalArgumentException: Missing flume header attribute,
>>> 'key' - cannot process this event
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>
>>> at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> atjava.lang.Thread.run(Thread.java:722)
>>>
>>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>>> [ERROR -
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
>>> to deliver event. Exception follows.
>>>
>>> org.apache.flume.EventDeliveryException: Failed to persist message
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
>>>
>>> at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> atjava.lang.Thread.run(Thread.java:722)
>>>
>>> Caused by: java.lang.IllegalArgumentException: Missing flume header
>>> attribute, 'key' - cannot process this event
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>
>>>         ... 3 more
>>>
>>> I got one solution as Key is Src+Key but am not getting how to configure
>>> it.
>>> So can any one please help me out to solve this problem.
>>> So
>>>
>>>
>>
>>
>> --
>> Nitin Pawar
>>
>
>


-- 
Nitin Pawar

Re: flume-ng cassandra sink problem..

Posted by Priyanka jain <ja...@gmail.com>.
Hi Nitin,
Thanks for your suggestion.

I have done all the thing as in README. but am not getting from where I can
set that key.
can you please give me idea about from where I can configure it or from
where its get generated.

On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <ni...@gmail.com>wrote:

> from the README
> you need to have following things in conf
>
> The Sink expects several flume event headers to be present:
>
>    - key - used (combined with src) to create the Cassandra row key. It
>    should be generated by the application doing the logging
>    - timestamp - timestamp of when the log occurred, not necessarily when
>    the flume event is created
>    - src - A logical source of the flume event. Could be host, but
>    probably you will have many hosts for a source. A more likely candidate for
>    source is the name of the application
>    - host - the name of the host where the message was generated
>    -
>
>
>
> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <
> jainpriyanka1110@gmail.com> wrote:
>
>> HI,
>>
>> I am working on the POC of Cassandra flume integration. For that am
>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink
>> plugin).*
>> *And *
>> *Flume-NG version-1.2.0*
>> *Apache Cassandra Version :1.1.5*
>> *I *have build the jar using maven and am using sink configuration as
>> below in flume.conf.cassandra in conf directory...
>>
>> *agent.sources = avrosource*
>>
>> *agent.channels = channel1*
>>
>> *agent.sinks = cassandraSink*
>>
>> * *
>>
>> *#source defination*
>>
>> *agent.sources.avrosource.channels = channel1*
>>
>> *agent.sources.avrosource.type = exec*
>>
>> *agent.sources.avrosource.command = tail -f
>> /home/user/priyanka/flume-ng/flnginput.txt*
>>
>> * *
>>
>> *#agent.sources.avrosource.type = avro*
>>
>> *#agent.sources.avrosource.channels = channel1*
>>
>> *#agent.sources.avrosource.bind =127.0.0.1*
>>
>> *#agent.sources.avrosource.port =41414*
>>
>> * *
>>
>> *#Flume header event*
>>
>> *agent.sources.avrosource.interceptors = addHost*
>>
>> *agent.sources.avrosource.interceptors.addHost.type =
>> org.apache.flume.interceptor.HostInterceptor$Builder*
>>
>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false*
>>
>> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>>
>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>>
>> *agent.sources.avrosource.interceptors = addTimestamp*
>>
>> *agent.sources.avrosource.interceptors.addTimestamp.type =
>> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>>
>> * *
>>
>> *# Cassandra flow*
>>
>> *agent.channels.channel1.type = FILE*
>>
>> *agent.channels.channel1.checkpointDir = file-channel1/check*
>>
>> *agent.channels.channel1.dataDirs = file-channel1/data*
>>
>> * *
>>
>> *agent.sinks.cassandraSink.channel = channel1*
>>
>> *agent.sinks.cassandraSink.type =
>> com.btoddb.flume.sinks.cassandra.CassandraSink*
>>
>> *agent.sinks.cassandraSink.hosts = localhost*
>>
>> *agent.sinks.cassandraSink.port = 9160*
>>
>> *agent.sinks.cassandraSink.keyspace-name  = logs*
>>
>> *agent.sinks.cassandraSink.records-colfam  = records*
>>
>> *
>> *
>>
>> *
>> *
>>
>> *
>> *
>>
>> *Am running this using the command :-*
>>
>> *
>> *
>>
>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
>> -Dflume.root.logger=DEBUG,console
>>
>>
>> Got the error while running :-
>>
>> * *
>>
>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>> [ERROR -
>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
>> exception while processing in Cassandra Sink
>>
>> java.lang.IllegalArgumentException: Missing flume header attribute, 'key'
>> - cannot process this event
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>
>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>
>> atjava.lang.Thread.run(Thread.java:722)
>>
>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>> [ERROR -
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
>> to deliver event. Exception follows.
>>
>> org.apache.flume.EventDeliveryException: Failed to persist message
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>
>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>
>> atjava.lang.Thread.run(Thread.java:722)
>>
>> Caused by: java.lang.IllegalArgumentException: Missing flume header
>> attribute, 'key' - cannot process this event
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>
>> at
>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>
>>         ... 3 more
>>
>> I got one solution as Key is Src+Key but am not getting how to configure
>> it.
>> So can any one please help me out to solve this problem.
>> So
>>
>>
>
>
> --
> Nitin Pawar
>

Re: flume-ng cassandra sink problem..

Posted by Nitin Pawar <ni...@gmail.com>.
from the README
you need to have following things in conf

The Sink expects several flume event headers to be present:

   - key - used (combined with src) to create the Cassandra row key. It
   should be generated by the application doing the logging
   - timestamp - timestamp of when the log occurred, not necessarily when
   the flume event is created
   - src - A logical source of the flume event. Could be host, but probably
   you will have many hosts for a source. A more likely candidate for source
   is the name of the application
   - host - the name of the host where the message was generated
   -



On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain
<ja...@gmail.com>wrote:

> HI,
>
> I am working on the POC of Cassandra flume integration. For that am
> using Cassandra sink plugin from *Github (flume-ng Cassandra sink plugin).
> *
> *And *
> *Flume-NG version-1.2.0*
> *Apache Cassandra Version :1.1.5*
> *I *have build the jar using maven and am using sink configuration as
> below in flume.conf.cassandra in conf directory...
>
> *agent.sources = avrosource*
>
> *agent.channels = channel1*
>
> *agent.sinks = cassandraSink*
>
> * *
>
> *#source defination*
>
> *agent.sources.avrosource.channels = channel1*
>
> *agent.sources.avrosource.type = exec*
>
> *agent.sources.avrosource.command = tail -f
> /home/user/priyanka/flume-ng/flnginput.txt*
>
> * *
>
> *#agent.sources.avrosource.type = avro*
>
> *#agent.sources.avrosource.channels = channel1*
>
> *#agent.sources.avrosource.bind =127.0.0.1*
>
> *#agent.sources.avrosource.port =41414*
>
> * *
>
> *#Flume header event*
>
> *agent.sources.avrosource.interceptors = addHost*
>
> *agent.sources.avrosource.interceptors.addHost.type =
> org.apache.flume.interceptor.HostInterceptor$Builder*
>
> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false*
>
> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>
> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>
> *agent.sources.avrosource.interceptors = addTimestamp*
>
> *agent.sources.avrosource.interceptors.addTimestamp.type =
> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>
> * *
>
> *# Cassandra flow*
>
> *agent.channels.channel1.type = FILE*
>
> *agent.channels.channel1.checkpointDir = file-channel1/check*
>
> *agent.channels.channel1.dataDirs = file-channel1/data*
>
> * *
>
> *agent.sinks.cassandraSink.channel = channel1*
>
> *agent.sinks.cassandraSink.type =
> com.btoddb.flume.sinks.cassandra.CassandraSink*
>
> *agent.sinks.cassandraSink.hosts = localhost*
>
> *agent.sinks.cassandraSink.port = 9160*
>
> *agent.sinks.cassandraSink.keyspace-name  = logs*
>
> *agent.sinks.cassandraSink.records-colfam  = records*
>
> *
> *
>
> *
> *
>
> *
> *
>
> *Am running this using the command :-*
>
> *
> *
>
> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
> -Dflume.root.logger=DEBUG,console
>
>
> Got the error while running :-
>
> * *
>
> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
> exception while processing in Cassandra Sink
>
> java.lang.IllegalArgumentException: Missing flume header attribute, 'key'
> - cannot process this event
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
> atjava.lang.Thread.run(Thread.java:722)
>
> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
> to deliver event. Exception follows.
>
> org.apache.flume.EventDeliveryException: Failed to persist message
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
> atjava.lang.Thread.run(Thread.java:722)
>
> Caused by: java.lang.IllegalArgumentException: Missing flume header
> attribute, 'key' - cannot process this event
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>
>         ... 3 more
>
> I got one solution as Key is Src+Key but am not getting how to configure
> it.
> So can any one please help me out to solve this problem.
> So
>
>


-- 
Nitin Pawar