You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 19:01:04 UTC

[GitHub] [beam] damccorm opened a new issue, #20689: Kafka commitOffsetsInFinalize OOM on Flink

damccorm opened a new issue, #20689:
URL: https://github.com/apache/beam/issues/20689

   Hi,
   
   I upgraded Beam from 2.19.0 (flink 1.9) to 2.25.0 (flink 1.11.1),And then it doesn't work。 
   
    
   The cluster version I use is:
       jdk1.8
       apache-zookeeper-3.4.14
       hadoop-3.2.1
       flink-1.11.1
    
   Submit job use command:
   ```
   
   bin/flink run -m yarn-cluster -ynm "xxx" -yjm 2048 -ytm 8192 ./some-executable.jar \
   --appName=xxxname
   \
   --runner=FlinkRunner \
   --parallelism=2 \
   --sourceKafkaUrl=192.168.12.13:9092 \
   --sourceTopic=sometopic
   \
   --sourceGroupId=guofy-host-dev \
   --sinkKafkaUrl=192.168.12.13:9092 \
   --debug=true \
   &
   
   ```
   
    
   Yarn is ok but taskmanager.log has exceptioins. 
   Kafka comsumer into an infinite loop, and finally report 
   java.lang.OutOfMemoryError: GC overhead limit is exceeded.
   Below is a partial log. Please help to analyze and solve it.
   
   ```
    
   2020-10-27 21:54:19.685 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-guofy-host-dev-6,
   groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27 21:54:19.685 INFO  org.apache.kafka.common.utils.AppInfoParser 
   - Kafka version: 2.6.0
   2020-10-27 21:54:19.685 INFO  org.apache.kafka.clients.Metadata  - [Consumer
   clientId=consumer-guofy-host-dev-6, groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27
   21:54:19.685 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.685
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.685
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.685
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859685
   2020-10-27 21:54:19.685
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859685
   2020-10-27 21:54:19.686
   INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
   groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-0
   2020-10-27
   21:54:19.686 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
   groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-0
   2020-10-27
   21:54:19.686 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
   groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Seeking to LATEST offset of partition g
   uofangyu-vm-dev-0
   2020-10-27
   21:54:19.686 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
   groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Seeking to LATEST offset of partition g
   uofangyu-vm-dev-0
   2020-10-27
   21:54:19.688 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
   groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27
   21:54:19.688 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
   groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27
   21:54:19.690 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
   groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Resetting offset for partition guofangy
   u-vm-dev-0
   to offset 0.
   2020-10-27 21:54:19.690 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, groupId=Reader-0_offset_consumer_528781572_guofy-host-dev]
   Resetting offset for partition guofangy
   u-vm-dev-0 to offset 0.
   2020-10-27 21:54:19.691 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, groupId=Reader-0_offset_consumer_528781572_guofy-host-dev]
   Seeking to LATEST offset of partition g
   uofangyu-vm-dev-0
   2020-10-27 21:54:19.691 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, groupId=Reader-0_offset_consumer_528781572_guofy-host-dev]
   Seeking to LATEST offset of partition g
   uofangyu-vm-dev-0
   2020-10-27 21:54:19.693 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, groupId=Reader-0_offset_consumer_528781572_guofy-host-dev]
   Resetting offset for partition guofangy
   u-vm-dev-0 to offset 0.
   2020-10-27 21:54:19.693 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, groupId=Reader-0_offset_consumer_528781572_guofy-host-dev]
   Resetting offset for partition guofangy
   u-vm-dev-0 to offset 0.
   2020-10-27 21:54:19.701 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
   - Reader-0: Returning from consumer pool loop
   2020-10-27 21:54:19.705 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
   - ConsumerConfig values:
   allow.auto.create.topics = true
   auto.commit.interval.ms = 5000
   auto.offset.reset
   = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs = true
   client.dns.lookup = use_all_dns_ips
   client.id
   = consumer-guofy-host-dev-8
   client.rack =
   connections.max.idle.ms = 540000
   default.api.timeout.ms
   = 60000
   enable.auto.commit = false
   exclude.internal.topics = true
   fetch.max.bytes = 52428800
   fetch.max.wait.ms
   = 500
   fetch.min.bytes = 1
   group.id = guofy-host-dev
   group.instance.id = null
   heartbeat.interval.ms
   = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.705 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
   allow.auto.create.topics
   = true
   auto.commit.interval.ms = 5000
   auto.offset.reset = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs
   = true
   client.dns.lookup = use_all_dns_ips
   client.id = consumer-guofy-host-dev-8
   client.rack =
   connections.max.idle.ms
   = 540000
   default.api.timeout.ms = 60000
   enable.auto.commit = false
   exclude.internal.topics = true
   fetch.max.bytes
   = 52428800
   fetch.max.wait.ms = 500
   fetch.min.bytes = 1
   group.id = guofy-host-dev
   group.instance.id
   = null
   heartbeat.interval.ms = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.707 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.707
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.707 INFO 
   org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.707
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.707
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859707
   2020-10-27 21:54:19.707
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859707
   2020-10-27 21:54:19.708
   INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-8,
   groupId=guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-1
   2020-10-27 21:54:19.708 INFO 
   org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-8, groupId=guofy-host-dev]
   Subscribed to partition(s): guofangyu-vm-dev-1
   2020-10-27 21:54:19.708 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
   - [Consumer clientId=consumer-guofy-host-dev-8, groupId=guofy-host-dev] Seeking to offset 0 for partition
   guofangyu-vm-dev-1
   2020-10-27 21:54:19.708 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  -
   [Consumer clientId=consumer-guofy-host-dev-8, groupId=guofy-host-dev] Seeking to offset 0 for partition
   guofangyu-vm-dev-1
   2020-10-27 21:54:19.708 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
   - Reader-1: reading from guofangyu-vm-dev-1 starting at offset 0
   2020-10-27 21:54:19.709 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
   - ConsumerConfig values:
   allow.auto.create.topics = true
   auto.commit.interval.ms = 5000
   auto.offset.reset
   = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs = true
   client.dns.lookup = use_all_dns_ips
   client.id
   = consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9
   client.rack =
   connections.max.idle.ms
   = 540000
   default.api.timeout.ms = 60000
   enable.auto.commit = false
   exclude.internal.topics = true
   fetch.max.bytes
   = 52428800
   fetch.max.wait.ms = 500
   fetch.min.bytes = 1
   group.id = Reader-1_offset_consumer_819035414_guofy-host-dev
   group.instance.id
   = null
   heartbeat.interval.ms = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.709 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
   allow.auto.create.topics
   = true
   auto.commit.interval.ms = 5000
   auto.offset.reset = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs
   = true
   client.dns.lookup = use_all_dns_ips
   client.id = consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9
   client.rack
   =
   connections.max.idle.ms = 540000
   default.api.timeout.ms = 60000
   enable.auto.commit = false
   exclude.internal.topics
   = true
   fetch.max.bytes = 52428800
   fetch.max.wait.ms = 500
   fetch.min.bytes = 1
   group.id = Reader-1_offset_consumer_819035414_guofy-host-dev
   group.instance.id
   = null
   heartbeat.interval.ms = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.710 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.710
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.710 INFO 
   org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.710
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.710
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859710
   2020-10-27 21:54:19.710
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859710
   2020-10-27 21:54:19.711
   INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
   groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-1
   2020-10-27
   21:54:19.711 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
   groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-1
   2020-10-27
   21:54:19.711 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-guofy-host-dev-8,
   groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27 21:54:19.711 INFO  org.apache.kafka.clients.Metadata 
   - [Consumer clientId=consumer-guofy-host-dev-8, groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27
   21:54:19.711 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
   groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Seeking to LATEST offset of partition g
   uofangyu-vm-dev-1
   2020-10-27
   21:54:19.711 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
   groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Seeking to LATEST offset of partition g
   uofangyu-vm-dev-1
   2020-10-27
   21:54:19.714 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
   groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27
   21:54:19.714 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
   groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27
   21:54:19.716 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
   groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Resetting offset for partition guofangy
   u-vm-dev-1
   to offset 0.
   2020-10-27 21:54:19.716 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9, groupId=Reader-1_offset_consumer_819035414_guofy-host-dev]
   Resetting offset for partition guofangy
   u-vm-dev-1 to offset 0.
   2020-10-27 21:54:19.717 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9, groupId=Reader-1_offset_consumer_819035414_guofy-host-dev]
   Seeking to LATEST offset of partition g
   uofangyu-vm-dev-1
   2020-10-27 21:54:19.717 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9, groupId=Reader-1_offset_consumer_819035414_guofy-host-dev]
   Seeking to LATEST offset of partition g
   uofangyu-vm-dev-1
   2020-10-27 21:54:19.719 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9, groupId=Reader-1_offset_consumer_819035414_guofy-host-dev]
   Resetting offset for partition guofangy
   u-vm-dev-1 to offset 0.
   2020-10-27 21:54:19.719 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9, groupId=Reader-1_offset_consumer_819035414_guofy-host-dev]
   Resetting offset for partition guofangy
   u-vm-dev-1 to offset 0.
   2020-10-27 21:54:19.727 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
   - Reader-1: Returning from consumer pool loop
   2020-10-27 21:54:19.729 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
   - ConsumerConfig values:
   allow.auto.create.topics = true
   auto.commit.interval.ms = 5000
   auto.offset.reset
   = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs = true
   client.dns.lookup = use_all_dns_ips
   client.id
   = consumer-guofy-host-dev-10
   client.rack =
   connections.max.idle.ms = 540000
   default.api.timeout.ms
   = 60000
   enable.auto.commit = false
   exclude.internal.topics = true
   fetch.max.bytes = 52428800
   fetch.max.wait.ms
   = 500
   fetch.min.bytes = 1
   group.id = guofy-host-dev
   group.instance.id = null
   heartbeat.interval.ms
   = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.729 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
   allow.auto.create.topics
   = true
   auto.commit.interval.ms = 5000
   auto.offset.reset = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs
   = true
   client.dns.lookup = use_all_dns_ips
   client.id = consumer-guofy-host-dev-10
   client.rack =
   connections.max.idle.ms
   = 540000
   default.api.timeout.ms = 60000
   enable.auto.commit = false
   exclude.internal.topics = true
   fetch.max.bytes
   = 52428800
   fetch.max.wait.ms = 500
   fetch.min.bytes = 1
   group.id = guofy-host-dev
   group.instance.id
   = null
   heartbeat.interval.ms = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.731 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.731
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.731 INFO 
   org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.731
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.731
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859731
   2020-10-27 21:54:19.731
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859731
   2020-10-27 21:54:19.732
   INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-10,
   groupId=guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-0
   2020-10-27 21:54:19.732 INFO 
   org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-10, groupId=guofy-host-dev]
   Subscribed to partition(s): guofangyu-vm-dev-0
   2020-10-27 21:54:19.732 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
   - [Consumer clientId=consumer-guofy-host-dev-10, groupId=guofy-host-dev] Seeking to offset 0 for partition
   guofangyu-vm-dev-0
   2020-10-27 21:54:19.732 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  -
   [Consumer clientId=consumer-guofy-host-dev-10, groupId=guofy-host-dev] Seeking to offset 0 for partition
   guofangyu-vm-dev-0
   2020-10-27 21:54:19.732 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
   - Reader-0: reading from guofangyu-vm-dev-0 starting at offset 0
   2020-10-27 21:54:19.732 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
   - ConsumerConfig values:
   allow.auto.create.topics = true
   auto.commit.interval.ms = 5000
   auto.offset.reset
   = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs = true
   client.dns.lookup = use_all_dns_ips
   client.id
   = consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11
   client.rack =
   connections.max.idle.ms
   = 540000
   default.api.timeout.ms = 60000
   enable.auto.commit = false
   exclude.internal.topics = true
   fetch.max.bytes
   = 52428800
   fetch.max.wait.ms = 500
   fetch.min.bytes = 1
   group.id = Reader-0_offset_consumer_803275858_guofy-host-dev
   group.instance.id
   = null
   heartbeat.interval.ms = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.732 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
   allow.auto.create.topics
   = true
   auto.commit.interval.ms = 5000
   auto.offset.reset = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs
   = true
   client.dns.lookup = use_all_dns_ips
   client.id = consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11
   client.rack
   =
   connections.max.idle.ms = 540000
   default.api.timeout.ms = 60000
   enable.auto.commit = false
   exclude.internal.topics
   = true
   fetch.max.bytes = 52428800
   fetch.max.wait.ms = 500
   fetch.min.bytes = 1
   group.id = Reader-0_offset_consumer_803275858_guofy-host-dev
   group.instance.id
   = null
   heartbeat.interval.ms = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.734 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.734
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.734 INFO 
   org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.734
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.734
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859734
   2020-10-27 21:54:19.734
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859734
   2020-10-27 21:54:19.734
   INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
   groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-0
   2020-10-27
   21:54:19.734 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
   groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-0
   2020-10-27
   21:54:19.735 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-guofy-host-dev-10,
   groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27 21:54:19.735 INFO  org.apache.kafka.clients.Metadata 
   - [Consumer clientId=consumer-guofy-host-dev-10, groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27
   21:54:19.735 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
   groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Seeking to LATEST offset of partition
   guofangyu-vm-dev-0
   2020-10-27
   21:54:19.735 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
   groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Seeking to LATEST offset of partition
   guofangyu-vm-dev-0
   2020-10-27
   21:54:19.737 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
   groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27
   21:54:19.737 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
   groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27
   21:54:19.739 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
   groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Resetting offset for partition guofang
   yu-vm-dev-0
   to offset 0.
   2020-10-27 21:54:19.739 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11, groupId=Reader-0_offset_consumer_803275858_guofy-host-dev]
   Resetting offset for partition guofang
   yu-vm-dev-0 to offset 0.
   2020-10-27 21:54:19.740 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11, groupId=Reader-0_offset_consumer_803275858_guofy-host-dev]
   Seeking to LATEST offset of partition
   guofangyu-vm-dev-0
   2020-10-27 21:54:19.740 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11, groupId=Reader-0_offset_consumer_803275858_guofy-host-dev]
   Seeking to LATEST offset of partition
   guofangyu-vm-dev-0
   2020-10-27 21:54:19.741 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11, groupId=Reader-0_offset_consumer_803275858_guofy-host-dev]
   Resetting offset for partition guofang
   yu-vm-dev-0 to offset 0.
   2020-10-27 21:54:19.741 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
   - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11, groupId=Reader-0_offset_consumer_803275858_guofy-host-dev]
   Resetting offset for partition guofang
   yu-vm-dev-0 to offset 0.
   2020-10-27 21:54:19.750 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
   - Reader-0: Returning from consumer pool loop
   2020-10-27 21:54:19.752 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
   - ConsumerConfig values:
   allow.auto.create.topics = true
   auto.commit.interval.ms = 5000
   auto.offset.reset
   = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs = true
   client.dns.lookup = use_all_dns_ips
   client.id
   = consumer-guofy-host-dev-12
   client.rack =
   connections.max.idle.ms = 540000
   default.api.timeout.ms
   = 60000
   enable.auto.commit = false
   exclude.internal.topics = true
   fetch.max.bytes = 52428800
   fetch.max.wait.ms
   = 500
   fetch.min.bytes = 1
   group.id = guofy-host-dev
   group.instance.id = null
   heartbeat.interval.ms
   = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.752 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
   allow.auto.create.topics
   = true
   auto.commit.interval.ms = 5000
   auto.offset.reset = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs
   = true
   client.dns.lookup = use_all_dns_ips
   client.id = consumer-guofy-host-dev-12
   client.rack =
   connections.max.idle.ms
   = 540000
   default.api.timeout.ms = 60000
   enable.auto.commit = false
   exclude.internal.topics = true
   fetch.max.bytes
   = 52428800
   fetch.max.wait.ms = 500
   fetch.min.bytes = 1
   group.id = guofy-host-dev
   group.instance.id
   = null
   heartbeat.interval.ms = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.754 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.754
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.754 INFO 
   org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.754
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.754
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859754
   2020-10-27 21:54:19.754
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859754
   2020-10-27 21:54:19.754
   INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-12,
   groupId=guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-1
   2020-10-27 21:54:19.754 INFO 
   org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-12, groupId=guofy-host-dev]
   Subscribed to partition(s): guofangyu-vm-dev-1
   2020-10-27 21:54:19.755 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
   - [Consumer clientId=consumer-guofy-host-dev-12, groupId=guofy-host-dev] Seeking to offset 0 for partition
   guofangyu-vm-dev-1
   2020-10-27 21:54:19.755 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  -
   [Consumer clientId=consumer-guofy-host-dev-12, groupId=guofy-host-dev] Seeking to offset 0 for partition
   guofangyu-vm-dev-1
   2020-10-27 21:54:19.755 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
   - Reader-1: reading from guofangyu-vm-dev-1 starting at offset 0
   2020-10-27 21:54:19.755 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
   - ConsumerConfig values:
   allow.auto.create.topics = true
   auto.commit.interval.ms = 5000
   auto.offset.reset
   = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs = true
   client.dns.lookup = use_all_dns_ips
   client.id
   = consumer-Reader-1_offset_consumer_576918038_guofy-host-dev-13
   client.rack =
   connections.max.idle.ms
   = 540000
   default.api.timeout.ms = 60000
   enable.auto.commit = false
   exclude.internal.topics = true
   fetch.max.bytes
   = 52428800
   fetch.max.wait.ms = 500
   fetch.min.bytes = 1
   group.id = Reader-1_offset_consumer_576918038_guofy-host-dev
   group.instance.id
   = null
   heartbeat.interval.ms = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.755 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
   allow.auto.create.topics
   = true
   auto.commit.interval.ms = 5000
   auto.offset.reset = earliest
   bootstrap.servers = [10.226.132.131:9092]
   check.crcs
   = true
   client.dns.lookup = use_all_dns_ips
   client.id = consumer-Reader-1_offset_consumer_576918038_guofy-host-dev-13
   client.rack
   =
   connections.max.idle.ms = 540000
   default.api.timeout.ms = 60000
   enable.auto.commit = false
   exclude.internal.topics
   = true
   fetch.max.bytes = 52428800
   fetch.max.wait.ms = 500
   fetch.min.bytes = 1
   group.id = Reader-1_offset_consumer_576918038_guofy-host-dev
   group.instance.id
   = null
   heartbeat.interval.ms = 3000
   interceptor.classes = []
   internal.leave.group.on.close = true
   internal.throw.on.fetch.stable.offset.unsupported
   = false
   isolation.level = read_uncommitted
   key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   max.partition.fetch.bytes
   = 1048576
   max.poll.interval.ms = 300000
   max.poll.records = 500
   metadata.max.age.ms = 300000
   metric.reporters
   = []
   metrics.num.samples = 2
   metrics.recording.level = INFO
   metrics.sample.window.ms = 30000
   partition.assignment.strategy
   = [class org.apache.kafka.clients.consumer.RangeAssignor]
   receive.buffer.bytes = 524288
   reconnect.backoff.max.ms
   = 1000
   reconnect.backoff.ms = 50
   request.timeout.ms = 30000
   retry.backoff.ms = 100
   sasl.client.callback.handler.class
   = null
   sasl.jaas.config = null
   sasl.kerberos.kinit.cmd = /usr/bin/kinit
   sasl.kerberos.min.time.before.relogin
   = 60000
   sasl.kerberos.service.name = null
   sasl.kerberos.ticket.renew.jitter = 0.05
   sasl.kerberos.ticket.renew.window.factor
   = 0.8
   sasl.login.callback.handler.class = null
   sasl.login.class = null
   sasl.login.refresh.buffer.seconds
   = 300
   sasl.login.refresh.min.period.seconds = 60
   sasl.login.refresh.window.factor = 0.8
   sasl.login.refresh.window.jitter
   = 0.05
   sasl.mechanism = GSSAPI
   security.protocol = PLAINTEXT
   security.providers = null
   send.buffer.bytes
   = 131072
   session.timeout.ms = 10000
   ssl.cipher.suites = null
   ssl.enabled.protocols = [TLSv1.2]
   ssl.endpoint.identification.algorithm
   = https
   ssl.engine.factory.class = null
   ssl.key.password = null
   ssl.keymanager.algorithm = SunX509
   ssl.keystore.location
   = null
   ssl.keystore.password = null
   ssl.keystore.type = JKS
   ssl.protocol = TLSv1.2
   ssl.provider
   = null
   ssl.secure.random.implementation = null
   ssl.trustmanager.algorithm = PKIX
   ssl.truststore.location
   = null
   ssl.truststore.password = null
   ssl.truststore.type = JKS
   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    
   2020-10-27
   21:54:19.757 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.757
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
   2020-10-27 21:54:19.757 INFO 
   org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.757
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
   2020-10-27 21:54:19.757
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859757
   2020-10-27 21:54:19.757
   INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859757
   2020-10-27 21:54:19.757
   INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-guofy-host-dev-12, groupId=guofy-host-dev]
   Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   2020-10-27 21:54:19.757 INFO  org.apache.kafka.clients.Metadata 
   - [Consumer clientId=consumer-guofy-host-dev-12, groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
   
   ```
    
   
   It worked fine for me with beam version 2.19.0,But 2.25.0 doesn't work。
   
   Imported from Jira [BEAM-11148](https://issues.apache.org/jira/browse/BEAM-11148). Original Jira may contain additional context.
   Reported by: titansfy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #20689: Kafka commitOffsetsInFinalize OOM on Flink

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on issue #20689:
URL: https://github.com/apache/beam/issues/20689#issuecomment-1468706836

   None that I'm aware of


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #20689: Kafka commitOffsetsInFinalize OOM on Flink

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on issue #20689:
URL: https://github.com/apache/beam/issues/20689#issuecomment-1468732339

   The effect of this option is set `commitOffsets()` which `setCommitOffsetEnabled(true)`. As far as I know ReadFromKafkaViaSDF is not quite working on Flink. This is likely related to #20979


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #20689: Kafka commitOffsetsInFinalize OOM on Flink

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on issue #20689:
URL: https://github.com/apache/beam/issues/20689#issuecomment-1468704732

   This seems like P2, empirically. Have we got more reports of similar problems?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #20689: Kafka commitOffsetsInFinalize OOM on Flink

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #20689:
URL: https://github.com/apache/beam/issues/20689#issuecomment-1246004515

   @johnjcasey has been looking at KafkaIO and may have updates on this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yianni commented on issue #20689: Kafka commitOffsetsInFinalize OOM on Flink

Posted by "yianni (via GitHub)" <gi...@apache.org>.
yianni commented on issue #20689:
URL: https://github.com/apache/beam/issues/20689#issuecomment-1514843834

   We faced this issue with the Flink runner, but we can't give more details since it was long ago.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #20689: Kafka commitOffsetsInFinalize OOM on Flink

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on issue #20689:
URL: https://github.com/apache/beam/issues/20689#issuecomment-1246969395

   I have very little idea where we could be leaking memory in commitOffsets, but the code hasn't substantially changed since the original bug report.
   
   @Abacn can you take a look at this while working on Kafka testing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #20689: Kafka commitOffsetsInFinalize OOM on Flink

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #20689:
URL: https://github.com/apache/beam/issues/20689#issuecomment-1246005503

   Bringing over a significant comment:
   
   > After a lot of attempts, when I deleted this line of code, it worked fine.
   >```
   >    PCollection<KV<String, byte[]>> messages =
   >        pipeline
   >            .apply("Read Kafka", KafkaIO.<String, byte[]>read()
   >                .withBootstrapServers(options.getSourceKafkaUrl()) 
   >                .withTopic(options.getSourceTopic())
   >                .withKeyDeserializer(StringDeserializer.class)
   >                .withValueDeserializer(ByteArrayDeserializer.class)
   >                .withConsumerConfigUpdates(ImmutableMap.of(
   >                    "enable.auto.commit", true,
   >                    "group.id", options.getSourceGroupId()))
   >                //.commitOffsetsInFinalize() // This is the code that caused the memory leak. I think this is a serious BUG.
   >                .withoutMetadata()
   >            );
   >
   >```
   > Looking at THE JVM GC, the old age space is not rising as much as before


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org