You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by John Michaels <mi...@gmail.com> on 2012/12/18 22:13:02 UTC

Flume-ng Avro RPC and Python

Hi,

I'm have a flume-ng source listening on port 45454, and I attempt to use
the following python script to send an event to the source, but receive the
netty exception below in the flume logs.

Has anyone had any success sending events via python? Can anyone suggest a
workaround or maybe I'm doing something wrong?

Thanks,
John


server_addr = ('localhost', 45454)
PROTOCOL = protocol.parse(open("flume.avpr").read())

def sendData():
        client = ipc.HTTPTransceiver(server_addr[0], server_addr[1])
        requestor = ipc.Requestor(PROTOCOL, client)

        event = dict()
        event['headers'] = {'table_name': 'foo', 'database': 'bar'}
event['body'] = bytes('hello')
        params = dict()
        params['event'] = event        print("Result : " +
requestor.request('append', params))
        client.close()
if __name__ == '__main__':
        sendData()
 18 Dec 2012 21:06:46,678 WARN  [New I/O server worker #1-5]
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught:201)
- Unexpected exception from downstream.
org.apache.avro.AvroRuntimeException: Excessively large list allocation
request detected: 539959368 items! Connection closed.        at
org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167)
        at
org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
        at
org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

RE: Flume-ng Avro RPC and Python

Posted by "Camp, Roy" <rc...@ebay.com>.
We use thrift to send events from Python, PHP and Java.  The only catch is you have to use the legacy thrift source, which means your application will not get a confirmation after the event has been saved to the channel.  We have found though that simply detecting connection failure has been highly reliable.

We have not evaluated the HTTPSource yet so you may want to check that out as well.

Roy


-----Original Message-----
From: Brock Noland [mailto:brock@cloudera.com] 
Sent: Tuesday, December 18, 2012 1:17 PM
To: user@flume.apache.org
Subject: Re: Flume-ng Avro RPC and Python

Hi,

This is because Flume uses the NettyTransceiver and pyton avro only supports HTTPTransciever.

This is not using avro, but you should be able to send JSON events to the HTTPSource (http://flume.apache.org/FlumeUserGuide.html#http-source).

Brock

On Tue, Dec 18, 2012 at 3:13 PM, John Michaels <mi...@gmail.com> wrote:
> Hi,
>
> I'm have a flume-ng source listening on port 45454, and I attempt to 
> use the following python script to send an event to the source, but 
> receive the netty exception below in the flume logs.
>
> Has anyone had any success sending events via python? Can anyone 
> suggest a workaround or maybe I'm doing something wrong?
>
> Thanks,
> John
>
>
> server_addr = ('localhost', 45454)
> PROTOCOL = protocol.parse(open("flume.avpr").read())
>
> def sendData():
>         client = ipc.HTTPTransceiver(server_addr[0], server_addr[1])
>         requestor = ipc.Requestor(PROTOCOL, client)
>
>         event = dict()
>         event['headers'] = {'table_name': 'foo', 'database': 'bar'} 
> event['body'] = bytes('hello')
>         params = dict()
>         params['event'] = event        print("Result : " +
> requestor.request('append', params))
>         client.close()
> if __name__ == '__main__':
>         sendData()
>  18 Dec 2012 21:06:46,678 WARN  [New I/O server worker #1-5]
> (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaugh
> t:201)
> - Unexpected exception from downstream.
> org.apache.avro.AvroRuntimeException: Excessively large list allocation
> request detected: 539959368 items! Connection closed.        at
> org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167)
>         at
> org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(Netty
> TransportCodec.java:139)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(Frame
> Decoder.java:216) at 
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261
> ) at 
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
>         at
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWo
> rker.java:282) at 
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecu
> tor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)



--
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/

Re: Flume-ng Avro RPC and Python

Posted by Brock Noland <br...@cloudera.com>.
Hi,

This is because Flume uses the NettyTransceiver and pyton avro only
supports HTTPTransciever.

This is not using avro, but you should be able to send JSON events to
the HTTPSource (http://flume.apache.org/FlumeUserGuide.html#http-source).

Brock

On Tue, Dec 18, 2012 at 3:13 PM, John Michaels <mi...@gmail.com> wrote:
> Hi,
>
> I'm have a flume-ng source listening on port 45454, and I attempt to use the
> following python script to send an event to the source, but receive the
> netty exception below in the flume logs.
>
> Has anyone had any success sending events via python? Can anyone suggest a
> workaround or maybe I'm doing something wrong?
>
> Thanks,
> John
>
>
> server_addr = ('localhost', 45454)
> PROTOCOL = protocol.parse(open("flume.avpr").read())
>
> def sendData():
>         client = ipc.HTTPTransceiver(server_addr[0], server_addr[1])
>         requestor = ipc.Requestor(PROTOCOL, client)
>
>         event = dict()
>         event['headers'] = {'table_name': 'foo', 'database': 'bar'}
> event['body'] = bytes('hello')
>         params = dict()
>         params['event'] = event        print("Result : " +
> requestor.request('append', params))
>         client.close()
> if __name__ == '__main__':
>         sendData()
>  18 Dec 2012 21:06:46,678 WARN  [New I/O server worker #1-5]
> (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught:201)
> - Unexpected exception from downstream.
> org.apache.avro.AvroRuntimeException: Excessively large list allocation
> request detected: 539959368 items! Connection closed.        at
> org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167)
>         at
> org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
> at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
>         at
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)



-- 
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/