You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Dan Everton <da...@iocaine.org> on 2011/07/21 05:00:00 UTC

Fwd: Possible Bug in AvroEventSource

Sorry, sent this to the wrong mailing list.


----- Original message -----
From: "Dan Everton" <da...@iocaine.org>
To: flume-user@cloudera.org
Date: Thu, 21 Jul 2011 12:59:19 +1000
Subject: Possible Bug in AvroEventSource

We're using a custom logging library to write to locally installed Flume
Node instance with an avroSource configured. Every so often the Flume
Node stops responding and it's thread count starts going up
dramatically. Checking the thread dumps from the node we see this:

Thread 611 (783209204@qtp-689559925-452):
  State: WAITING
  Blocked count: 0
  Waited count: 1
  Waiting on
  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@23085bf6
  Stack:
    sun.misc.Unsafe.park(Native Method)
    java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
    java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
    java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:254)
    com.cloudera.flume.handlers.avro.AvroEventSource.enqueue(AvroEventSource.java:116)
    com.cloudera.flume.handlers.avro.AvroEventSource$1.append(AvroEventSource.java:137)
    sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    java.lang.reflect.Method.invoke(Method.java:597)
    org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
    org.apache.avro.ipc.Responder.respond(Responder.java:150)
    org.apache.avro.ipc.Responder.respond(Responder.java:100)
    org.apache.avro.ipc.ResponderServlet.doPost(ResponderServlet.java:48)
    javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
    org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
    org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:401)
    org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766)
    org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
    org.mortbay.jetty.Server.handle(Server.java:326)

over and over again.

I think what's happening is that something causes the Avro server to
fail and the events just queue up eventually causing the node to fail.

Poking around the AvroEventSource.java code around those lines I see
this

    this.svr = new FlumeEventAvroServerImpl(port) {
      @Override
      public void append(AvroFlumeEvent evt) {
        // convert AvroEvent evt -> e
        AvroEventAdaptor adapt = new AvroEventAdaptor(evt);
        try {
          enqueue(adapt.toFlumeEvent());
        } catch (IOException e1) {
          e1.printStackTrace();
        }
        super.append(evt);
      }
    };

The fact that any exceptions form the enqueue process get swallowed
seems problematic to me, but I'm not sure if it's why things eventually
fail.

Has anyone else seen something like this? For the moment we're going to
switch back to Thrift as that seems to be better tested.

Cheers,
Dan


Re: Fwd: Possible Bug in AvroEventSource

Posted by Dan Everton <da...@iocaine.org>.
On Mon, 25 Jul 2011 15:42 -0700, "Jonathan Hsieh" <jo...@cloudera.com>
wrote:
> Dan,
> 
> Nice catch and I agree with you.  I'll file an issue to clean this up
> once
> we get the new issue tracker up.
> 
> Thanks,
> Jon.

Cool, it would be good to get it fixed so the Avro RPC is at parity with
the Thrift RPC. However, I think I've found the root cause of the
problems I was seeing with sending log messages and it's nothing to do
with Flume code.

We're currently testing Flume in a pre-production environment with about
20 Flume nodes. Our setup is fairly straightforward basically:

srvX.example.com-serverLog-appname serverLogFlow thriftSource(31308)
agentDFOSink("log1", 35853, batchCount=100, batchMillis=30000,
compression=true)
srvX.example.com-accessLog-appname accessLogFlow tailDir("...", "...",
true) agentDFOSink("log1", 35854, batchCount=100, batchMillis=30000,
compression=true)

log1.example.com-serverLog-collector serverLogFlow
collectorSource(35853) collectorSink("...", "...", 300000)
log1.example.com-accessLog-collector accessLogFlow
collectorSource(35854) collectorSink("...", "...", 300000)

The applications use a custom log library (similar to the Log4j Avro
appender in Flume) to write to the local Flume node's thriftSource. This
is a blocking write which waits for the node to respond before letting
the application continue. This works fine and we've tested it up to 3000
events per second per node. The problem is that, as this is a test
environment, some applications are idle and don't write logs. This leads
to the agent's connection to the collector not having data go over it
for many hours. This is when the problems start.

The firewall sees these idle connections and closes them without
informing the agent or the collector that the connection has been
dropped. Now, when the application does start trying to write a log
message it blocks waiting for the agent to forward the event to the
collector. Eventually the nodes internal queue is full because it hasn't
been able to send the events and the application comes to a halt because
all its threads are busy trying to write to the local agent. The agent
is still heartbeating away and appears to be healthy to the master, but
no log traffic comes through.

We've seen this issue before with long lived JDBC connections and we'll
fix it in this case the same way: by increasing the session timeout for
the Flume traffic in the firewall. I'm not sure what a proper solution
would be. Setting SO_KEEPALIVE on the socket might help, but by default
that only pings the connection every two hours. It may not be often
enough to prevent a firewall from considering the connection idle and
killable. Perhaps some sort of keepalive event could be sent over the
agentDFOSink link every so often, but that seems inelegant.

Anyway, just thought I'd write this up in case someone else runs in to
it.

Cheers,
Dan

Re: Fwd: Possible Bug in AvroEventSource

Posted by Jonathan Hsieh <jo...@cloudera.com>.
Dan,

Nice catch and I agree with you.  I'll file an issue to clean this up once
we get the new issue tracker up.

Thanks,
Jon.

On Sun, Jul 24, 2011 at 10:37 PM, Dan Everton <da...@iocaine.org> wrote:

> On Thu, 21 Jul 2011 13:00 +1000, "Dan Everton" <da...@iocaine.org> wrote:
>
> > Poking around the AvroEventSource.java code around those lines I see
> > this
> >
> >     this.svr = new FlumeEventAvroServerImpl(port) {
> >       @Override
> >       public void append(AvroFlumeEvent evt) {
> >         // convert AvroEvent evt -> e
> >         AvroEventAdaptor adapt = new AvroEventAdaptor(evt);
> >         try {
> >           enqueue(adapt.toFlumeEvent());
> >         } catch (IOException e1) {
> >           e1.printStackTrace();
> >         }
> >         super.append(evt);
> >       }
> >     };
> >
> > The fact that any exceptions form the enqueue process get swallowed
> > seems problematic to me, but I'm not sure if it's why things eventually
> > fail.
>
> I think I totally misread the above code and the problem is likely
> elsewhere. I still think it's bad that the exception gets swallowed, but
> I don't understand Flume well enough to know what the consequence of
> that is. Either way, switching back to using Thrift for RPC seems to
> have solved this for now.
>
> Cheers,
> Dan
>



-- 
// Jonathan Hsieh (shay)
// Software Engineer, Cloudera
// jon@cloudera.com

Re: Fwd: Possible Bug in AvroEventSource

Posted by Dan Everton <da...@iocaine.org>.
On Thu, 21 Jul 2011 13:00 +1000, "Dan Everton" <da...@iocaine.org> wrote:

> Poking around the AvroEventSource.java code around those lines I see
> this
> 
>     this.svr = new FlumeEventAvroServerImpl(port) {
>       @Override
>       public void append(AvroFlumeEvent evt) {
>         // convert AvroEvent evt -> e
>         AvroEventAdaptor adapt = new AvroEventAdaptor(evt);
>         try {
>           enqueue(adapt.toFlumeEvent());
>         } catch (IOException e1) {
>           e1.printStackTrace();
>         }
>         super.append(evt);
>       }
>     };
> 
> The fact that any exceptions form the enqueue process get swallowed
> seems problematic to me, but I'm not sure if it's why things eventually
> fail.

I think I totally misread the above code and the problem is likely
elsewhere. I still think it's bad that the exception gets swallowed, but
I don't understand Flume well enough to know what the consequence of
that is. Either way, switching back to using Thrift for RPC seems to
have solved this for now.

Cheers,
Dan