You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kristopher Kane <kk...@gmail.com> on 2019/03/01 15:10:06 UTC

Spark 2.4 Structured Streaming Kafka assign API polling same offsets

We are using the assign API to do batch work with Spark and Kafka.
What I'm seeing is the Spark executor work happening in the back
ground and constantly polling the same data over and over until the
main thread commits the offsets.

Is the below a blocking operation?

  Dataset<Row> df = spark.read().format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("assign", "topic1,topic2")
  .option("startingOffsets",
"{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets",
"{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");


###########################################

Here is an example.  Our desired batch is 20 records to commit on.
Due to segment size (this is a test) 12 records are returned in each
poll. Spark gets to offset 20 and our program is working to
filter/process/commit but the Spark polling continues again in the
back ground starting at offset -2 since it has not been committed yet.
This suggesting the above .read.().load() is non-blocking.


2019-03-01 09:21:41 INFO  [THREAD ID=main] RawHdfsFlowType:50 -
Getting data from Kafka
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset -2 requested 0
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Seeking to
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 0
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Polled
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
[compacted-gap-message-0]  12
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
polling

2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 1 requested 1
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 2 requested 2
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 3 requested 3
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 4 requested 4
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 5 requested 5
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 6 requested 6
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 7 requested 7
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 8 requested 8
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 9 requested 9
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 10 requested 10
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 11 requested 11
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 12 requested 12

2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Seeking to
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 12
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Polled
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
[compacted-gap-message-0]  12
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Offset changed from 12 to 24 after
polling

2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 13 requested 13
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 14 requested 14
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 15 requested 15
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 16 requested 16
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 17 requested 17
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 18 requested 18
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 19 requested 19

2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:255 -
Offset to commit from actually pulled data matches our range for
partition 0
2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:264 -
Original offsets to commit for partitions:
{compacted-gap-message-0=20}
2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:265 -
Modified offsets to commit for partitions:
{compacted-gap-message-0=20}

###################### Main thread is working to commit offset 20 but
Spark is moving on and doing it over since
2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
task 201] InternalKafkaConsumer:58 - Get
spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
compacted-gap-message-0 nextOffset -2 requested 0
2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
task 201] InternalKafkaConsumer:58 - Seeking to
spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
compacted-gap-message-0 0
2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
task 201] InternalKafkaConsumer:58 - Polled
spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
[compacted-gap-message-0]  12
2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
task 201] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
polling

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark 2.4 Structured Streaming Kafka assign API polling same offsets

Posted by Kristopher Kane <kk...@gmail.com>.
I figured out why.  We are not persisting the data at the end of
.load().  Thus, every operation like count() is going back to Kafka
for the data again.

On Fri, Mar 1, 2019 at 10:10 AM Kristopher Kane <kk...@gmail.com> wrote:
>
> We are using the assign API to do batch work with Spark and Kafka.
> What I'm seeing is the Spark executor work happening in the back
> ground and constantly polling the same data over and over until the
> main thread commits the offsets.
>
> Is the below a blocking operation?
>
>   Dataset<Row> df = spark.read().format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("assign", "topic1,topic2")
>   .option("startingOffsets",
> "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
>   .option("endingOffsets",
> "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
>   .load();
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>
>
> ###########################################
>
> Here is an example.  Our desired batch is 20 records to commit on.
> Due to segment size (this is a test) 12 records are returned in each
> poll. Spark gets to offset 20 and our program is working to
> filter/process/commit but the Spark polling continues again in the
> back ground starting at offset -2 since it has not been committed yet.
> This suggesting the above .read.().load() is non-blocking.
>
>
> 2019-03-01 09:21:41 INFO  [THREAD ID=main] RawHdfsFlowType:50 -
> Getting data from Kafka
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset -2 requested 0
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Seeking to
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 0
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Polled
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> [compacted-gap-message-0]  12
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
> polling
>
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 1 requested 1
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 2 requested 2
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 3 requested 3
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 4 requested 4
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 5 requested 5
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 6 requested 6
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 7 requested 7
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 8 requested 8
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 9 requested 9
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 10 requested 10
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 11 requested 11
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 12 requested 12
>
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Seeking to
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 12
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Polled
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> [compacted-gap-message-0]  12
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Offset changed from 12 to 24 after
> polling
>
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 13 requested 13
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 14 requested 14
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 15 requested 15
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 16 requested 16
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 17 requested 17
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 18 requested 18
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 19 requested 19
>
> 2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:255 -
> Offset to commit from actually pulled data matches our range for
> partition 0
> 2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:264 -
> Original offsets to commit for partitions:
> {compacted-gap-message-0=20}
> 2019-03-01 09:21:46 DEBUG [THREAD ID=main] KafkaOperations:265 -
> Modified offsets to commit for partitions:
> {compacted-gap-message-0=20}
>
> ###################### Main thread is working to commit offset 20 but
> Spark is moving on and doing it over since
> 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
> task 201] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
> compacted-gap-message-0 nextOffset -2 requested 0
> 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
> task 201] InternalKafkaConsumer:58 - Seeking to
> spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
> compacted-gap-message-0 0
> 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
> task 201] InternalKafkaConsumer:58 - Polled
> spark-kafka-relation-66226b26-a099-4e58-bce9-ca373ab16d62-executor
> [compacted-gap-message-0]  12
> 2019-03-01 09:21:46 DEBUG [THREAD ID=Executor task launch worker for
> task 201] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
> polling

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org