You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "ccagml (JIRA)" <ji...@apache.org> on 2017/10/27 06:26:00 UTC

[jira] [Commented] (FLUME-3188) [ERROR - org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.run(TThreadedSelectorServer.java:544)] run() exiting due to uncaught error

    [ https://issues.apache.org/jira/browse/FLUME-3188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221785#comment-16221785 ] 

ccagml commented on FLUME-3188:
-------------------------------

# list the sources, sinks and channels in the agent
a1.sources = r1
a1.sinks = k1 k2 k5 k6 k7
a1.channels = c1 c2 c5 c6 c7

# set channels for source
a1.sources.r1.channels = c1 c2 c5 c6 c7

# set channel for sinks
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k5.channel = c5
a1.sinks.k6.channel = c6
a1.sinks.k7.channel = c7

a1.sources.r1.type = thrift
a1.sources.r1.channels = c1 c2 c5 c6 c7
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 57888
a1.sources.r1.threads = 200

# channel selector configuration
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = State
a1.sources.r1.selector.mapping.c_log = c1
a1.sources.r1.selector.mapping.v_log = c2
a1.sources.r1.selector.mapping.l_log = c5
a1.sources.r1.selector.mapping.a_log = c6
a1.sources.r1.selector.mapping.r_log = c7


a1.channels.c1.type = file
a1.channels.c1.dataDirs = /data/agent/a1c1/flume-data
a1.channels.c1.useDualCheckpoints=true
a1.channels.c1.checkpointDir = /data/agent/a1c1/flume-checkpoint1
a1.channels.c1.backupCheckpointDir = /data/agent/a1c1/flume-checkpoint2
a1.channels.c1.capacity = 2500000
a1.channels.c1.transactionCapacity = 28000
a1.channels.c1.maxFileSize=52428800
a1.channels.c1.checkpointInterval=5000


a1.channels.c2.type = file
a1.channels.c2.dataDirs = /data/agent/a1c2/flume-data
a1.channels.c2.useDualCheckpoints=true
a1.channels.c2.checkpointDir = /data/agent/a1c2/flume-checkpoint1
a1.channels.c2.backupCheckpointDir = /data/agent/a1c2/flume-checkpoint2
a1.channels.c2.capacity = 2500000
a1.channels.c2.transactionCapacity = 28000
a1.channels.c2.maxFileSize=52428800
a1.channels.c2.checkpointInterval=5000

a1.channels.c5.type = file
a1.channels.c5.dataDirs = /data/agent/a1c5/flume-data
a1.channels.c5.useDualCheckpoints=true
a1.channels.c5.checkpointDir = /data/agent/a1c5/flume-checkpoint1
a1.channels.c5.backupCheckpointDir = /data/agent/a1c5/flume-checkpoint2
a1.channels.c5.capacity = 2500000
a1.channels.c5.transactionCapacity = 28000
a1.channels.c5.maxFileSize=52428800
a1.channels.c5.checkpointInterval=5000

a1.channels.c6.type = file
a1.channels.c6.dataDirs = /data/agent/a1c6/flume-data
a1.channels.c6.useDualCheckpoints=true
a1.channels.c6.checkpointDir = /data/agent/a1c6/flume-checkpoint1
a1.channels.c6.backupCheckpointDir = /data/agent/a1c6/flume-checkpoint2
a1.channels.c6.capacity = 2500000
a1.channels.c6.transactionCapacity = 28000
a1.channels.c6.maxFileSize=52428800
a1.channels.c6.checkpointInterval=5000

a1.channels.c7.type = file
a1.channels.c7.dataDirs = /data/agent/a1c7/flume-data
a1.channels.c7.useDualCheckpoints=true
a1.channels.c7.checkpointDir = /data/agent/a1c7/flume-checkpoint1
a1.channels.c7.backupCheckpointDir = /data/agent/a1c7/flume-checkpoint2
a1.channels.c7.capacity = 2500000
a1.channels.c7.transactionCapacity = 28000
a1.channels.c7.maxFileSize=52428800
a1.channels.c7.checkpointInterval=5000



a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = c_log
a1.sinks.k1.brokerList = localhost
a1.sinks.k1.requiredAcks = -1
# a1.sinks.k1.kafka.producer.batch.size= 61440
a1.sinks.k1.kafka.producer.max.request.size=3145728
a1.sinks.k1.channel = c1



a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.topic = v_log
a1.sinks.k2.brokerList = localhost
a1.sinks.k2.requiredAcks = -1
# a1.sinks.k2.kafka.producer.batch.size= 61440
a1.sinks.k2.kafka.producer.max.request.size=3145728
a1.sinks.k2.channel = c2


a1.sinks.k5.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k5.topic = l_log
a1.sinks.k5.brokerList = localhost
a1.sinks.k5.requiredAcks = -1
# a1.sinks.k5.kafka.producer.batch.size= 61440
a1.sinks.k5.kafka.producer.max.request.size=3145728
a1.sinks.k5.channel = c5


a1.sinks.k6.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k6.topic = a_log
a1.sinks.k6.brokerList = localhost
a1.sinks.k6.requiredAcks = -1
# a1.sinks.k6.kafka.producer.batch.size= 61440
a1.sinks.k6.kafka.producer.max.request.size=3145728
a1.sinks.k6.channel = c6


a1.sinks.k7.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k7.topic = r_log
a1.sinks.k7.brokerList = localhost
a1.sinks.k7.requiredAcks = -1
# a1.sinks.k7.kafka.producer.batch.size= 61440
a1.sinks.k7.kafka.producer.max.request.size=3145728
a1.sinks.k7.channel = c7



> [ERROR - org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.run(TThreadedSelectorServer.java:544)] run() exiting due to uncaught error
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-3188
>                 URL: https://issues.apache.org/jira/browse/FLUME-3188
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: 1.7.0
>         Environment: docker,centos7,openjdk 1.8.0_131
>            Reporter: ccagml
>
> When I throw this exception, flume stops service. How do I fix it?
> 2017/10/14 afternoon12:01:352017-10-14 12:01:35,995 (Thread-1) [ERROR - org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.run(TThreadedSelectorServer.java:544)] run() exiting due to uncaught error
> 2017/10/14 afternoon12:01:35java.lang.OutOfMemoryError
> 2017/10/14 afternoon12:01:35	at sun.misc.Unsafe.allocateMemory(Native Method)
> 2017/10/14 afternoon12:01:35	at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
> 2017/10/14 afternoon12:01:35	at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> 2017/10/14 afternoon12:01:35	at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
> 2017/10/14 afternoon12:01:35	at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> 2017/10/14 afternoon12:01:35	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> 2017/10/14 afternoon12:01:35	at org.apache.thrift.transport.TNonblockingSocket.read(TNonblockingSocket.java:141)
> 2017/10/14 afternoon12:01:35	at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.internalRead(AbstractNonblockingServer.java:515)
> 2017/10/14 afternoon12:01:35	at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.read(AbstractNonblockingServer.java:355)
> 2017/10/14 afternoon12:01:35	at org.apache.thrift.server.AbstractNonblockingServer$AbstractSelectThread.handleRead(AbstractNonblockingServer.java:202)
> 2017/10/14 afternoon12:01:35	at org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.select(TThreadedSelectorServer.java:576)
> 2017/10/14 afternoon12:01:35	at org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.run(TThreadedSelectorServer.java:536)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)