You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nick Hryhoriev (Jira)" <ji...@apache.org> on 2021/03/13 09:23:00 UTC

[jira] [Comment Edited] (SPARK-31427) Spark Structure streaming read data twice per every micro-batch.

    [ https://issues.apache.org/jira/browse/SPARK-31427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300773#comment-17300773 ] 

Nick Hryhoriev edited comment on SPARK-31427 at 3/13/21, 9:22 AM:
------------------------------------------------------------------

I check spark 3, it's the same.
 Maybe in the case of repartition by the range you are right.
 But the same issue happens in  Sort and repartition.
 ```

 
{code:java}
package org.apache.spark.af.it
import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SparkSession
object TestKafka {
def main(args: Array[String]): Unit =
{ implicit val spark: SparkSession = SparkSession.builder().master("local").getOrCreate() spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka-20001-026-prod.eu1.appsflyer.com:9092") .option("assign", """\{"sessions":[0,1] }
""")
 .load()
 .writeStream
 .foreachBatch(process _)
 .start()
 .awaitTermination(1000000)
}
private def process(data: Dataset[Row], batchId: Long)(implicit spark: SparkSession): Unit =
{ import spark.implicits._ spark.sparkContext.setJobDescription(s"session [parquet] - $batchId") data .sort($"partition", $"offset") .repartition(4, $"offset") .show(5) }
}
{code}
```


was (Author: hryhoriev.nick):
I check spark 3, it's the same.
Maybe in the case of repartition by the range you are right.
But the same issue happens in  Sort and repartition.
```

package org.apache.spark.af.it

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object TestKafka {

 def main(args: Array[String]): Unit = {
 implicit val spark: SparkSession = SparkSession.builder().master("local").getOrCreate()

 spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "kafka-20001-026-prod.eu1.appsflyer.com:9092")
 .option("assign", """\{"sessions":[0,1] }""")
 .load()
 .writeStream
 .foreachBatch(process _)
 .start()
 .awaitTermination(1000000)

 }

 private def process(data: Dataset[Row], batchId: Long)(implicit spark: SparkSession): Unit = {
 import spark.implicits._
 spark.sparkContext.setJobDescription(s"session [parquet] - $batchId")
 data
 .sort($"partition", $"offset")
 .repartition(4, $"offset")
 .show(5)
 }

}
```

> Spark Structure streaming read data twice per every micro-batch.
> ----------------------------------------------------------------
>
>                 Key: SPARK-31427
>                 URL: https://issues.apache.org/jira/browse/SPARK-31427
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.3
>            Reporter: Nick Hryhoriev
>            Priority: Major
>
> I have a very strange issue with spark structure streaming. Spark structure streaming creates two spark jobs for every micro-batch. As a result, read data from Kafka twice. Here is a simple code snippet.
>  
> {code:java}
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.streaming.Trigger
> object CheckHowSparkReadFromKafka {
>   def main(args: Array[String]): Unit = {
>     val session = SparkSession.builder()
>       .config(new SparkConf()
>         .setAppName(s"simple read from kafka with repartition")
>         .setMaster("local[*]")
>         .set("spark.driver.host", "localhost"))
>       .getOrCreate()
>     val testPath = "/tmp/spark-test"
>     FileSystem.get(session.sparkContext.hadoopConfiguration).delete(new Path(testPath), true)
>     import session.implicits._
>     val stream = session
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers",        "kafka-20002-prod:9092")
>       .option("subscribe", "topic")
>       .option("maxOffsetsPerTrigger", 1000)
>       .option("failOnDataLoss", false)
>       .option("startingOffsets", "latest")
>       .load()
>       .repartitionByRange( $"offset")
>       .writeStream
>       .option("path", testPath + "/data")
>       .option("checkpointLocation", testPath + "/checkpoint")
>       .format("parquet")
>       .trigger(Trigger.ProcessingTime(10.seconds))
>       .start()
>     stream.processAllAvailable()
> {code}
> This happens because if {{.repartitionByRange( $"offset")}}, if I remove this line, all good. But with spark create two jobs, one with 1 stage just read from Kafka, the second with 3 stage read -> shuffle -> write. So the result of the first job never used.
> This has a significant impact on performance. Some of my Kafka topics have 1550 partitions, so read them twice is a big deal. In case I add cache, things going better, but this is not a way for me. In local mode, the first job in batch takes less than 0.1 ms, except batch with index 0. But in YARN cluster and Messos both jobs fully expected and on my topics take near 1.2 min.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org