You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "raymond zhao (JIRA)" <ji...@apache.org> on 2017/10/18 10:07:01 UTC

[jira] [Commented] (FLUME-3073) KafkaSource EXCEPTION java.lang.IllegalStateException: Correlation id for response (1077833) does not match request (1077776)

    [ https://issues.apache.org/jira/browse/FLUME-3073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16209098#comment-16209098 ] 

raymond zhao commented on FLUME-3073:
-------------------------------------

I have the same problem too.

I just use kafka source with kafka channel and hdfs sink.

one kafka source(3 partitions), three kafka channels(against with source partition), and three sink(against with three channel)

-----------------   Flume config  --------------------

# vcf kafka agent configuration
vcf_kfk_agent.sources = vcf_kfk_source
vcf_kfk_agent.sinks = vcf_kfk_sink1 vcf_kfk_sink2 vcf_kfk_sink3
vcf_kfk_agent.channels = vcf_kfk_channel1 vcf_kfk_channel2 vcf_kfk_channel3

# Describe the vcf kafka agent source
vcf_kfk_agent.sources.vcf_kfk_source.type = org.apache.flume.source.kafka.KafkaSource
vcf_kfk_agent.sources.vcf_kfk_source.channels = vcf_kfk_channel1 vcf_kfk_channel2 vcf_kfk_channel3
vcf_kfk_agent.sources.vcf_kfk_source.batchSize = 5000
vcf_kfk_agent.sources.vcf_kfk_source.batchDurationMillis = 2000
vcf_kfk_agent.sources.vcf_kfk_source.kafka.bootstrap.servers = 10.1.252.113:9092
vcf_kfk_agent.sources.vcf_kfk_source.kafka.topics = vcf_flume_source_topic
vcf_kfk_agent.sources.vcf_kfk_source.kafka.consumer.group.id = flume.vcf.source.group

# Describe the vcf kafka agent sink
vcf_kfk_agent.sinks.vcf_kfk_sink1.type = hdfs
vcf_kfk_agent.sinks.vcf_kfk_sink1.channel = vcf_kfk_channel1
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.path = hdfs://hdfsCluster/project/data_analysis_platform/tanyun/flume/vcf_data/ins/%Y%m%d
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.filePrefix = vcf-stage1
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.fileType = DataStream
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.writeFormat = Text
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.round = true
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.rollInterval = 3600
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.rollSize = 128000000
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.rollCount = 0
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.batchSize = 20
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.roundValue = 1
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.rountUnit = minute
vcf_kfk_agent.sinks.vcf_kfk_sink1.hdfs.useLocalTimeStamp = true

vcf_kfk_agent.sinks.vcf_kfk_sink2.type = hdfs
vcf_kfk_agent.sinks.vcf_kfk_sink2.channel = vcf_kfk_channel2
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.path = hdfs://hdfsCluster/project/data_analysis_platform/tanyun/flume/vcf_data/ins/%Y%m%d
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.filePrefix = vcf-stage2
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.fileType = DataStream
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.writeFormat = Text
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.round = true
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.rollInterval = 3600
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.rollSize = 128000000
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.rollCount = 0
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.batchSize = 20
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.roundValue = 1
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.rountUnit = minute
vcf_kfk_agent.sinks.vcf_kfk_sink2.hdfs.useLocalTimeStamp = true

vcf_kfk_agent.sinks.vcf_kfk_sink3.type = hdfs
vcf_kfk_agent.sinks.vcf_kfk_sink3.channel = vcf_kfk_channel3
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.path = hdfs://hdfsCluster/project/data_analysis_platform/tanyun/flume/vcf_data/ins/%Y%m%d
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.filePrefix = vcf-stage3
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.fileType = DataStream
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.writeFormat = Text
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.round = true
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.rollInterval = 3600
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.rollSize = 128000000
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.rollCount = 0
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.batchSize = 20
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.roundValue = 1
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.rountUnit = minute
vcf_kfk_agent.sinks.vcf_kfk_sink3.hdfs.useLocalTimeStamp = true

# Describe the vcf kafka agent channel
vcf_kfk_agent.channels.vcf_kfk_channel1.type = org.apache.flume.channel.kafka.KafkaChannel
vcf_kfk_agent.channels.vcf_kfk_channel1.defaultPartitionId = 0
vcf_kfk_agent.channels.vcf_kfk_channel1.parseAsFlumeEvent = false
vcf_kfk_agent.channels.vcf_kfk_channel1.kafka.bootstrap.servers = 10.1.252.113:9092
vcf_kfk_agent.channels.vcf_kfk_channel1.kafka.topic = vcf_flume_channel_topic
vcf_kfk_agent.channels.vcf_kfk_channel1.kafka.consumer.group.id = flume.vcf.channel.group
vcf_kfk_agent.channels.vcf_kfk_channel1.auto.offset.reset = latest

vcf_kfk_agent.channels.vcf_kfk_channel2.type = org.apache.flume.channel.kafka.KafkaChannel
vcf_kfk_agent.channels.vcf_kfk_channel2.defaultPartitionId = 1
vcf_kfk_agent.channels.vcf_kfk_channel2.parseAsFlumeEvent = false
vcf_kfk_agent.channels.vcf_kfk_channel2.kafka.bootstrap.servers = 10.1.252.113:9092
vcf_kfk_agent.channels.vcf_kfk_channel2.kafka.topic = vcf_flume_channel_topic
vcf_kfk_agent.channels.vcf_kfk_channel2.kafka.consumer.group.id = flume.vcf.channel.group
vcf_kfk_agent.channels.vcf_kfk_channel2.auto.offset.reset = latest

vcf_kfk_agent.channels.vcf_kfk_channel3.type = org.apache.flume.channel.kafka.KafkaChannel
vcf_kfk_agent.channels.vcf_kfk_channel3.defaultPartitionId = 2
vcf_kfk_agent.channels.vcf_kfk_channel3.parseAsFlumeEvent = false
vcf_kfk_agent.channels.vcf_kfk_channel3.kafka.bootstrap.servers = 10.1.252.113:9092
vcf_kfk_agent.channels.vcf_kfk_channel3.kafka.topic = vcf_flume_channel_topic
vcf_kfk_agent.channels.vcf_kfk_channel3.kafka.consumer.group.id = flume.vcf.channel.group
vcf_kfk_agent.channels.vcf_kfk_channel3.auto.offset.reset = latest

-----------------------------------------------------------


--------------------  Exception info  -----------------

18 Oct 2017 17:26:35,727 ERROR [PollableSourceRunner-KafkaSource-vcf_kfk_source] (org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource EXCEPTION, {}
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'generation_id': java.nio.BufferUnderflowException
	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
	at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:200)
	at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
	at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
	at java.lang.Thread.run(Thread.java:745)

18 Oct 2017 17:26:40,728 ERROR [PollableSourceRunner-KafkaSource-vcf_kfk_source] (org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource EXCEPTION, {}
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'generation_id': java.nio.BufferUnderflowException
	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
	at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:200)
	at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
	at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
	at java.lang.Thread.run(Thread.java:745)

----------------------------------------------------------

Flume version: 1.7.0
Kafka version: 0.9.0
JDK version: 1.8



> KafkaSource EXCEPTION java.lang.IllegalStateException: Correlation id for response (1077833) does not match request (1077776)
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-3073
>                 URL: https://issues.apache.org/jira/browse/FLUME-3073
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: 1.7.0
>            Reporter: xuwei
>
> my agent:kafkaSource-->file channel-->hdfsSink
> my.conf
> agent.sources = kafkaSource
> agent.channels = kafka2HdfsConnectionMon
> agent.sinks = hdfsSink
> agent.sources.kafkaSource.channels = kafka2HdfsConnectionMon
> agent.sinks.hdfsSink.channel = kafka2HdfsConnectionMon
> #---------kafkasource ------------------
> agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
> agent.sources.kafkaSource.kafka.bootstrap.servers = 10.2.1.23:9092
> agent.sources.kafkaSource.topic = userMon
> agent.sources.kafkaSource.groupId = flumeConsumer
> agent.sources.kafkaSource.kafka.consumer.timeout.ms = 300000 
> #---------filechannel ------------------
> agent.channels.kafka2HdfsConnectionMon.type = file
> agent.channels.kafka2HdfsConnectionMon.checkpointDir = /data/filechannle_data/kafka2HdfsConnectionMon/checkpoint
> agent.channels.kafka2HdfsConnectionMon.dataDirs = /data/filechannle_data/kafka2HdfsConnectionMon/data
> #---------hdfsSink ------------------
> agent.sinks.hdfsSink.type = hdfs
> agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/connectionMon/%Y%m%d
> agent.sinks.hdfsSink.hdfs.writeFormat = Text
> agent.sinks.hdfsSink.hdfs.fileType = DataStream
> agent.sinks.hdfsSink.hdfs.rollSize = 134217720
> agent.sinks.hdfsSink.hdfs.rollCount = 100000
> agent.sinks.hdfsSink.hdfs.rollInterval = 600
> agent.sinks.hdfsSink.hdfs.filePrefix=run
> agent.sinks.hdfsSink.hdfs.fileSuffix=.data
> agent.sinks.hdfsSink.hdfs.inUserPrefix=_
> agent.sinks.hdfsSink.hdfs.inUserSuffix=
> flume-env.sh:
> JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
> When the flume run for a period of time(About a few hours),flume will thow this exception.
> -----------------------------------------------------------------------
> 15 Mar 2017 03:56:45,687 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource EXCEPTION, {}
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': java.nio.BufferUnderflowException
>         at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
>         at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
>         at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
>         at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
>         at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
>         at java.lang.Thread.run(Thread.java:745)
> 15 Mar 2017 03:56:47,697 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource EXCEPTION, {}
> java.lang.IllegalStateException: Correlation id for response (1077833) does not match request (1077776)
>         at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>         at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
>         at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
>         at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
>         at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
>         at java.lang.Thread.run(Thread.java:745)
> 15 Mar 2017 03:56:50,880 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314)  - KafkaSource EXCEPTION, {}
> java.lang.IllegalStateException: Correlation id for response (1077886) does not match request (1077833)
>         at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>         at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
>         at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
>         at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
>         at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
>         at java.lang.Thread.run(Thread.java:745)
> 。。。。。。。。。。
> 。。。。。。。。。
> -----------------------------------------------------------------------
> The flume will always produce the exception,Data can be normal to write HDFS,but these data are repeated。
> I think there is no correct offset。



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)