You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by karan alang <ka...@gmail.com> on 2022/09/07 06:14:00 UTC
Spark Structured Streaming - unable to change max.poll.records (showing as 1)
Hello All,
i've a Spark structured streaming job which reads from Kafka, does
processing and puts data into Mongo/Kafka/GCP Buckets (i.e. it is
processing heavy)
I'm consistently seeing the following warnings:
```
22/09/06 16:55:03 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer clientId=consumer-spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0-1,
groupId=spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0]
Member consumer-spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0-1-604d740f-16d1-46b3-955c-502be5b02be1
sending LeaveGroup request to coordinator 35.237.40.54:9094 (id:
2147483645 rack: null) due to consumer poll timeout has expired. This
means the time between subsequent calls to poll() was longer than the
configured max.poll.interval.ms, which typically implies that the poll
loop is spending too much time processing messages. You can address
this either by increasing max.poll.interval.ms or by reducing the
maximum size of batches returned in poll() with max.poll.records.
```
I'm trying to change the values of the parameters - max.poll.interval.ms &
max.poll.records in the spark.readStream (shown below)
```
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("kafka.metadata.max.age.ms", "1000") \
.option("kafka.ssl.keystore.type", "PKCS12") \
.option("kafka.ssl.truststore.type", "PKCS12") \
.option("kafka.max.poll.interval.ms", 600000) \
.option("kafka.max.poll.records", 7000) \
.load()
```
The values that I see in the KafkaConsumer are :
max.poll.interval.ms - 600000 (changed from 300000) max.poll.records - not
getting changed, it is showing as 1
```
22/09/07 02:29:32 INFO
org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig
values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [34.138.213.152:9094]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id =
consumer-spark-kafka-source-17ac0d19-f30c-4db7-91b9-1dbe9172829e-594190416-driver-0-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = spark-kafka-source-17ac0d19-f30c-4db7-91b9-1dbe9172829e-594190416-driver-0
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 600000
max.poll.records = 1
metadata.max.age.ms = 1000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = syslog-vani-noacl.p12
ssl.keystore.password = [hidden]
ssl.keystore.type = PKCS12
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = versa-kafka-gke-ca.p12
ssl.truststore.password = [hidden]
ssl.truststore.type = PKCS12
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
```
Kafka documentation states that the default value of - max.poll.records =
500
so, few questions -
How do I change the value of max.poll.records to the desired value ? Also,
why is the value showing as 1, instead of the default value of 500 ?
Here is the stackoverflow link as well:
https://stackoverflow.com/questions/73630738/spark-structured-streaming-unable-to-change-max-poll-records-showing-as-1
tia!