You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by karan alang <ka...@gmail.com> on 2022/09/07 06:14:00 UTC

Spark Structured Streaming - unable to change max.poll.records (showing as 1)

Hello All,

i've a Spark structured streaming job which reads from Kafka, does
processing and puts data into Mongo/Kafka/GCP Buckets (i.e. it is
processing heavy)

I'm consistently seeing the following warnings:

```

22/09/06 16:55:03 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator:
[Consumer clientId=consumer-spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0-1,
groupId=spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0]
Member consumer-spark-kafka-source-4e7e7f32-19ab-44d5-99f5-59fb5a462af2-594190416-driver-0-1-604d740f-16d1-46b3-955c-502be5b02be1
sending LeaveGroup request to coordinator 35.237.40.54:9094 (id:
2147483645 rack: null) due to consumer poll timeout has expired. This
means the time between subsequent calls to poll() was longer than the
configured max.poll.interval.ms, which typically implies that the poll
loop is spending too much time processing messages. You can address
this either by increasing max.poll.interval.ms or by reducing the
maximum size of batches returned in poll() with max.poll.records.

```

I'm trying to change the values of the parameters - max.poll.interval.ms &
max.poll.records in the spark.readStream (shown below)

```

df_stream = spark.readStream.format('kafka') \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location", ssl_keystore_location) \
        .option("kafka.ssl.keystore.password", ssl_keystore_password) \
        .option("kafka.bootstrap.servers", kafkaBrokers) \
        .option("subscribe", topic) \
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", "false") \
        .option("kafka.metadata.max.age.ms", "1000") \
        .option("kafka.ssl.keystore.type", "PKCS12") \
        .option("kafka.ssl.truststore.type", "PKCS12") \
        .option("kafka.max.poll.interval.ms", 600000) \
        .option("kafka.max.poll.records", 7000) \
        .load()

```

The values that I see in the KafkaConsumer are :

max.poll.interval.ms - 600000 (changed from 300000) max.poll.records - not
getting changed, it is showing as 1

```

22/09/07 02:29:32 INFO
org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig
values:
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [34.138.213.152:9094]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id =
consumer-spark-kafka-source-17ac0d19-f30c-4db7-91b9-1dbe9172829e-594190416-driver-0-1
    client.rack =
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = spark-kafka-source-17ac0d19-f30c-4db7-91b9-1dbe9172829e-594190416-driver-0
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 600000
    max.poll.records = 1
    metadata.max.age.ms = 1000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = syslog-vani-noacl.p12
    ssl.keystore.password = [hidden]
    ssl.keystore.type = PKCS12
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = versa-kafka-gke-ca.p12
    ssl.truststore.password = [hidden]
    ssl.truststore.type = PKCS12
    value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer

```

Kafka documentation states that the default value of - max.poll.records =
500

so, few questions -

How do I change the value of max.poll.records to the desired value ? Also,
why is the value showing as 1, instead of the default value of 500 ?


Here is the stackoverflow link as well:


https://stackoverflow.com/questions/73630738/spark-structured-streaming-unable-to-change-max-poll-records-showing-as-1


tia!