You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Abhishek Singla <ab...@gmail.com> on 2023/04/27 14:39:02 UTC

config: minOffsetsPerTrigger not working

Hi Team,

I am using Spark Streaming to read from Kafka and write to S3.

Version: 3.1.2
Scala Version: 2.12
Spark Kafka connector: spark-sql-kafka-0-10_2.12

Dataset<Row> df =
    spark
        .readStream()
        .format("kafka")
        .options(appConfig.getKafka().getConf())
        .load()
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

df.writeStream()
    .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
    .start()
    .awaitTermination();

kafka.conf = {
               "kafka.bootstrap.servers": "localhost:9092",
               "subscribe": "test-topic",
               "minOffsetsPerTrigger": 10000000,
               "maxOffsetsPerTrigger": 11000000,
               "maxTriggerDelay": "15m",
               "groupIdPrefix": "test",
               "startingOffsets": "latest",
               "includeHeaders": true,
               "failOnDataLoss": false
              }

spark.conf = {
               "spark.master": "spark://localhost:7077",
               "spark.app.name": "app",
               "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false,
               "spark.sql.streaming.metricsEnabled": true
             }


But these configs do not seem to be working as I can see Spark processing
batches of 3k-15k immediately one after another. Is there something I am
missing?

Ref:
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Regards,
Abhishek Singla

Re: config: minOffsetsPerTrigger not working

Posted by Abhishek Singla <ab...@gmail.com>.
Thanks, Mich for acknowledging.

Yes, I am providing the checkpoint path. I omitted it here in the code
snippet.

I believe this is due to spark version 3.1.x, this config is there only in
versions greater than 3.2.x

On Thu, Apr 27, 2023 at 9:26 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Is this all of your writeStream?
>
> df.writeStream()
>     .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
>     .start()
>     .awaitTermination();
>
> What happened to the checkpoint location?
>
> option('checkpointLocation', checkpoint_path).
>
> example
>
>  checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt"
>
>
> ls -l  /ssd/hduser/MDBatchBQ/chkpt
> total 24
> -rw-r--r--. 1 hduser hadoop   45 Mar  1 09:27 metadata
> drwxr-xr-x. 5 hduser hadoop 4096 Mar  1 09:27 .
> drwxr-xr-x. 4 hduser hadoop 4096 Mar  1 10:31 ..
> drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources
> drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets
> drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits
>
> so you can see what is going on
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Apr 2023 at 15:46, Abhishek Singla <ab...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> I am using Spark Streaming to read from Kafka and write to S3.
>>
>> Version: 3.1.2
>> Scala Version: 2.12
>> Spark Kafka connector: spark-sql-kafka-0-10_2.12
>>
>> Dataset<Row> df =
>>     spark
>>         .readStream()
>>         .format("kafka")
>>         .options(appConfig.getKafka().getConf())
>>         .load()
>>         .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>>
>> df.writeStream()
>>     .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
>>     .start()
>>     .awaitTermination();
>>
>> kafka.conf = {
>>                "kafka.bootstrap.servers": "localhost:9092",
>>                "subscribe": "test-topic",
>>                "minOffsetsPerTrigger": 10000000,
>>                "maxOffsetsPerTrigger": 11000000,
>>                "maxTriggerDelay": "15m",
>>                "groupIdPrefix": "test",
>>                "startingOffsets": "latest",
>>                "includeHeaders": true,
>>                "failOnDataLoss": false
>>               }
>>
>> spark.conf = {
>>                "spark.master": "spark://localhost:7077",
>>                "spark.app.name": "app",
>>                "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false,
>>                "spark.sql.streaming.metricsEnabled": true
>>              }
>>
>>
>> But these configs do not seem to be working as I can see Spark processing
>> batches of 3k-15k immediately one after another. Is there something I am
>> missing?
>>
>> Ref:
>> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>>
>> Regards,
>> Abhishek Singla
>>
>>
>>
>>
>>
>>
>>
>>
>>

Re: config: minOffsetsPerTrigger not working

Posted by Mich Talebzadeh <mi...@gmail.com>.
Is this all of your writeStream?

df.writeStream()
    .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
    .start()
    .awaitTermination();

What happened to the checkpoint location?

option('checkpointLocation', checkpoint_path).

example

 checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt"


ls -l  /ssd/hduser/MDBatchBQ/chkpt
total 24
-rw-r--r--. 1 hduser hadoop   45 Mar  1 09:27 metadata
drwxr-xr-x. 5 hduser hadoop 4096 Mar  1 09:27 .
drwxr-xr-x. 4 hduser hadoop 4096 Mar  1 10:31 ..
drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources
drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets
drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits

so you can see what is going on

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 27 Apr 2023 at 15:46, Abhishek Singla <ab...@gmail.com>
wrote:

> Hi Team,
>
> I am using Spark Streaming to read from Kafka and write to S3.
>
> Version: 3.1.2
> Scala Version: 2.12
> Spark Kafka connector: spark-sql-kafka-0-10_2.12
>
> Dataset<Row> df =
>     spark
>         .readStream()
>         .format("kafka")
>         .options(appConfig.getKafka().getConf())
>         .load()
>         .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>
> df.writeStream()
>     .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
>     .start()
>     .awaitTermination();
>
> kafka.conf = {
>                "kafka.bootstrap.servers": "localhost:9092",
>                "subscribe": "test-topic",
>                "minOffsetsPerTrigger": 10000000,
>                "maxOffsetsPerTrigger": 11000000,
>                "maxTriggerDelay": "15m",
>                "groupIdPrefix": "test",
>                "startingOffsets": "latest",
>                "includeHeaders": true,
>                "failOnDataLoss": false
>               }
>
> spark.conf = {
>                "spark.master": "spark://localhost:7077",
>                "spark.app.name": "app",
>                "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false,
>                "spark.sql.streaming.metricsEnabled": true
>              }
>
>
> But these configs do not seem to be working as I can see Spark processing
> batches of 3k-15k immediately one after another. Is there something I am
> missing?
>
> Ref:
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
> Regards,
> Abhishek Singla
>
>
>
>
>
>
>
>
>