You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Yvonne Lam <yv...@aro.com> on 2011/08/16 18:54:06 UTC
flume stops sending data
We are using flume to copy the same log data to s3 and hdfs. After we
upgraded to 0.9.4 about a week ago, we started running into an issue where
flume occasionally stops sending data to one or the other. Any suggestions
as to what might be going on?
version: flume 0.9.4
setup:
ObservationNode-1 TomcatDatalogFlow
tailDir("<logdir>","<logfile_names>",true) autoBEChain
ObservationConsumer-1 TomcatDatalogFlow autoCollectorSource
collectorSink("s3n://<key>:<secret>@<bucket>/logs/dataSource=%{tailSrcFile}/date=%Y%m%d/","%{tailSrcFile}","300000")
HDFSObservationNode-1 TomcatHDFSFlow
tailDir("<logdir>","<logfile_names>",true) autoBEChain
HDFSObservationConsumer-1 TomcatHDFSFlow autoCollectorSource
collectorSink("hdfs://<hdfs_node>:9000/<dirname>/logs/dataSource=%{tailSrcFile}/date=%Y%m%d/",
"%{tailSrcFile}", "30000")
The observation nodes are running on one physical node, the consumers are
running on another physical node.
thread dump from the collector:
2011-08-16 15:48:10
Full thread dump Java HotSpot(TM) Client VM (19.1-b02 mixed mode, sharing):
"Thread-2" daemon prio=10 tid=0x09240000 nid=0x23b7 runnable [0xb412d000]
java.lang.Thread.State: RUNNABLE
at java.io.FileInputStream.readBytes(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:177)
at com.cloudera.util.InputStreamPipe$CopyThread.run(InputStreamPipe.java:100)
"Thread-1" daemon prio=10 tid=0x0923f000 nid=0x23b6 runnable [0xb417e000]
java.lang.Thread.State: RUNNABLE
at java.io.FileInputStream.readBytes(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:199)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:258)
at java.io.BufferedInputStream.read(BufferedInputStream.java:317)
- locked <0x7f3821c0> (a java.io.BufferedInputStream)
at java.io.FilterInputStream.read(FilterInputStream.java:90)
at com.cloudera.util.InputStreamPipe$CopyThread.run(InputStreamPipe.java:100)
"process reaper" daemon prio=10 tid=0x091a9400 nid=0x23b4 runnable [0xb41cf000]
java.lang.Thread.State: RUNNABLE
at java.lang.UNIXProcess.waitForProcessExit(Native Method)
at java.lang.UNIXProcess.access$900(UNIXProcess.java:20)
at java.lang.UNIXProcess$1$1.run(UNIXProcess.java:132)
"Low Memory Detector" daemon prio=10 tid=0x08e05400 nid=0x23b2
runnable [0x00000000]
java.lang.Thread.State: RUNNABLE
"CompilerThread0" daemon prio=10 tid=0x08e03800 nid=0x23b1 waiting on
condition [0x00000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" daemon prio=10 tid=0x08e01c00 nid=0x23b0 waiting
on condition [0x00000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" daemon prio=10 tid=0x08dfdc00 nid=0x23af in Object.wait()
[0xb487e000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x7f383480> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
- locked <0x7f383480> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
"Reference Handler" daemon prio=10 tid=0x08df9400 nid=0x23ae in
Object.wait() [0xb48cf000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x7f380110> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:485)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
- locked <0x7f380110> (a java.lang.ref.Reference$Lock)
"main" prio=10 tid=0x08dcb400 nid=0x23ac runnable [0xb6dae000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked <0x7f383588> (a sun.nio.ch.Util$2)
- locked <0x7f383598> (a java.util.Collections$UnmodifiableSet)
- locked <0x7f383548> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at com.cloudera.flume.watchdog.Watchdog.launchAgent(Watchdog.java:182)
at com.cloudera.flume.watchdog.Watchdog.run(Watchdog.java:267)
at com.cloudera.flume.watchdog.FlumeWatchdog.main(FlumeWatchdog.java:77)
"VM Thread" prio=10 tid=0x08deec00 nid=0x23ad runnable
"VM Periodic Task Thread" prio=10 tid=0x08e11400 nid=0x23b3 waiting on
condition
JNI global references: 1221
2011-08-16 15:48:10
Full thread dump Java HotSpot(TM) Client VM (19.1-b02 mixed mode, sharing):
"MultiThHeap
def new generation total 4928K, used 3763K [0x7eeb0000, 0x7f400000,
0x84400000)
eden space 4416K, 73% used [0x7eeb0000, 0x7f1dce08, 0x7f300000)
from space 512K, 100% used [0x7f380000, 0x7f400000, 0x7f400000)
to space 512K, 0% used [0x7f300000, 0x7f300000, 0x7f380000)
tenured generation total 10944K, used 50K [0x84400000, 0x84eb0000,
0x8eeb0000)
the space 10944K, 0% used [0x84400000, 0x8440ca70, 0x8440cc00, 0x84eb0000)
compacting perm gen total 12288K, used 1657K [0x8eeb0000,
0x8fab0000, 0x92eb0000)
the space 12288K, 13% used [0x8eeb0000, 0x8f04e7c8, 0x8f04e800, 0x8fab0000)
ro space 10240K, 61% used [0x92eb0000, 0x934d8a38, 0x934d8c00, 0x938b0000)
rw space 12288K, 60% used [0x938b0000, 0x93fe8ec0, 0x93fe9000, 0x944b0000)
readedHttpConnectionManager cleanup" daemon prio=10 tid=0x08452800
nid=0x2900 in Object.wait() [0xb38e1000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
- locked <0x8455a0e0> (a java.lang.ref.ReferenceQueue$Lock)
at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ReferenceQueueThread.run(MultiThreadedHttpConnectionManager.java:1082)
"pool-3-thread-1" prio=10 tid=0x08435400 nid=0x28fd waiting on
condition [0xb39d4000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x84541250> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:662)
"pool-1-thread-1" prio=10 tid=0x088d0800 nid=0x28fc runnable [0xb3a76000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:258)
at java.io.BufferedInputStream.read(BufferedInputStream.java:317)
- locked <0x84551078> (a java.io.BufferedInputStream)
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
at com.cloudera.flume.handlers.thrift.TStatsTransport.read(TStatsTransport.java:59)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
at com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer$Processor.process(ThriftFlumeEventServer.java:224)
at org.apache.thrift.server.TSaneThreadPoolServer$WorkerProcess.run(TSaneThreadPoolServer.java:280)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
"LeaseChecker" daemon prio=10 tid=0x08431000 nid=0x2888 waiting on
condition [0xb3a25000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at org.apache.hadoop.hdfs.DFSClient$LeaseChecker.run(DFSClient.java:1167)
at java.lang.Thread.run(Thread.java:662)
"pool-4-thread-1" prio=10 tid=0x08409800 nid=0x2884 waiting on
condition [0xb3b69000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x8452c830> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:662)
"Roll-TriggerThread-1" prio=10 tid=0x0866c400 nid=0x2882 waiting on
condition [0xb3c0b000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.cloudera.util.Clock$DefaultClock.doSleep(Clock.java:62)
at com.cloudera.util.Clock.sleep(Clock.java:88)
at com.cloudera.flume.handlers.rolling.RollSink$TriggerThread.run(RollSink.java:148)
"Roll-TriggerThread-0" prio=10 tid=0x08450000 nid=0x2880 waiting on
condition [0xb3cad000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.cloudera.util.Clock$DefaultClock.doSleep(Clock.java:62)
at com.cloudera.util.Clock.sleep(Clock.java:88)
at com.cloudera.flume.handlers.rolling.RollSink$TriggerThread.run(RollSink.java:148)
"Thrift server:class org.apache.thrift.TProcessorFactory on class
org.apache.thrift.transport.TSaneServerSocket" prio=10 tid=0x0844cc00
nid=0x287f runnable [0xb3cfe000]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:408)
- locked <0x8452ede0> (a java.net.SocksSocketImpl)
at java.net.ServerSocket.implAccept(ServerSocket.java:462)
at java.net.ServerSocket.accept(ServerSocket.java:430)
at org.apache.thrift.transport.TSaneServerSocket.acceptImpl(TSaneServerSocket.java:134)
at org.apache.thrift.transport.TServerTransport.accept(TServerTransport.java:31)
at org.apache.thrift.server.TSaneThreadPoolServer$1.run(TSaneThreadPoolServer.java:175)
"logicalNode HDFSObservationConsumer-1-23" prio=10 tid=0x0844d800
nid=0x287e waiting on condition [0xb3eba000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.cloudera.flume.handlers.thrift.ThriftEventSource.close(ThriftEventSource.java:191)
- locked <0x8452ef60> (a com.cloudera.flume.handlers.thrift.ThriftEventSource)
at com.cloudera.flume.core.connector.DirectDriver$PumperThread.ensureClosed(DirectDriver.java:142)
at com.cloudera.flume.core.connector.DirectDriver$PumperThread.errorCleanup(DirectDriver.java:163)
at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:116)
"logicalNode ObservationConsumer-1-22" prio=10 tid=0x0844c000
nid=0x287d waiting on condition [0xb3e69000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x8453d490> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
at com.cloudera.flume.handlers.thrift.ThriftEventSource.next(ThriftEventSource.java:209)
at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:105)
"DestroyJavaVM" prio=10 tid=0x083e7000 nid=0x23b8 waiting on condition
[0x00000000]
java.lang.Thread.State: RUNNABLE
"ChokeManager" daemon prio=10 tid=0x088aa000 nid=0x23d1 waiting on
condition [0xb3f0c000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.cloudera.flume.handlers.debug.ChokeManager.run(ChokeManager.java:143)
"Heartbeat" prio=10 tid=0x088b3400 nid=0x23d0 waiting on condition [0xb3f5c000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at com.cloudera.util.Clock$DefaultClock.doSleep(Clock.java:62)
at com.cloudera.util.Clock.sleep(Clock.java:88)
at com.cloudera.flume.agent.LivenessManager$HeartbeatThread.run(LivenessManager.java:234)
"Check config" prio=10 tid=0x088b1c00 nid=0x23cf waiting on condition
[0xb3fad000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x84490da8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at com.cloudera.flume.agent.LivenessManager.dequeueCheckConfig(LivenessManager.java:70)
at com.cloudera.flume.agent.LivenessManager$CheckConfigThread.run(LivenessManager.java:205)
"Thread-1" prio=10 tid=0x08899400 nid=0x23ce waiting for monitor entry
[0xb3ffe000]
java.lang.Thread.State: BLOCKED (on object monitor)
at com.cloudera.flume.handlers.thrift.ThriftEventSource.getMetrics(ThriftEventSource.java:93)
- waiting to lock <0x8452ef60> (a
com.cloudera.flume.handlers.thrift.ThriftEventSource)
at com.cloudera.flume.core.EventSource$Base.getReports(EventSource.java:186)
at com.cloudera.flume.agent.LogicalNode.getReports(LogicalNode.java:309)
- locked <0x8451f8d8> (a com.cloudera.flume.agent.LogicalNode)
at com.cloudera.flume.reporter.MasterReportPusher$PusherThread.querySrcSinkReports(MasterReportPusher.java:102)
at com.cloudera.flume.reporter.MasterReportPusher$PusherThread.sendReports(MasterReportPusher.java:110)
at com.cloudera.flume.reporter.MasterReportPusher$PusherThread.run(MasterReportPusher.java:119)
"Timer-0" daemon prio=10 tid=0x088b0800 nid=0x23cd in Object.wait() [0xb415c000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.util.TimerThread.mainLoop(Timer.java:509)
- locked <0x844d02e0> (a java.util.TaskQueue)
at java.util.TimerThread.run(Timer.java:462)
"16496587@qtp-2859291-1 - Acceptor0
SelectChannelConnector@0.0.0.0:35862" prio=10 tid=0x08880800
nid=0x23cc runnable [0xb41ad000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked <0x844d0398> (a sun.nio.ch.Util$2)
- locked <0x844d03a8> (a java.util.Collections$UnmodifiableSet)
- locked <0x844d0358> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at org.mortbay.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:498)
at org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:192)
at org.mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java:124)
at org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:708)
at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
"23047631@qtp-2859291-0" prio=10 tid=0x08876000 nid=0x23cb in
Object.wait() [0xb41fe000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:626)
- locked <0x844a6f28> (a org.mortbay.thread.QueuedThreadPool$PoolThread)
"Low Memory Detector" daemon prio=10 tid=0x08421000 nid=0x23c5
runnable [0x00000000]
java.lang.Thread.State: RUNNABLE
"CompilerThread0" daemon prio=10 tid=0x0841f400 nid=0x23c4 waiting on
condition [0x00000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" daemon prio=10 tid=0x0841dc00 nid=0x23c3 waiting
on condition [0x00000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" daemon prio=10 tid=0x08416800 nid=0x23bb in Object.wait()
[0xb48e3000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
- locked <0x84402268> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
"Reference Handler" daemon prio=10 tid=0x08415000 nid=0x23ba in
Object.wait() [0xb4934000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:485)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
- locked <0x844022f8> (a java.lang.ref.Reference$Lock)
"VM Thread" prio=10 tid=0x0840a800 nid=0x23b9 runnable
"VM Periodic Task Thread" prio=10 tid=0x0842d000 nid=0x23c6 waiting on
condition
JNI global references: 1619
Heap
def new generation total 4928K, used 4244K [0x7eeb0000, 0x7f400000,
0x84400000)
eden space 4416K, 93% used [0x7eeb0000, 0x7f2bcd28, 0x7f300000)
from space 512K, 19% used [0x7f300000, 0x7f318650, 0x7f380000)
to space 512K, 0% used [0x7f380000, 0x7f380000, 0x7f400000)
tenured generation total 10944K, used 3874K [0x84400000,
0x84eb0000, 0x8eeb0000)
the space 10944K, 35% used [0x84400000, 0x847c8878, 0x847c8a00, 0x84eb0000)
compacting perm gen total 12288K, used 11098K [0x8eeb0000,
0x8fab0000, 0x92eb0000)
the space 12288K, 90% used [0x8eeb0000, 0x8f986860, 0x8f986a00, 0x8fab0000)
ro space 10240K, 61% used [0x92eb0000, 0x934d8a38, 0x934d8c00, 0x938b0000)
rw space 12288K, 60% used [0x938b0000, 0x93fe8ec0, 0x93fe9000, 0x944b0000)