You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Vikash Kumar (JIRA)" <ji...@apache.org> on 2019/01/22 16:37:00 UTC

[jira] [Updated] (SPARK-26691) WholeStageCodegen after InMemoryTableScan task takes significant time and time increases based on the input size

     [ https://issues.apache.org/jira/browse/SPARK-26691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Vikash Kumar updated SPARK-26691:
---------------------------------
    Summary: WholeStageCodegen after InMemoryTableScan task takes significant time and time increases based on the input size  (was: WholeStageCodegen after InMemoryTableScan task takes more time and time increases based on the input size)

> WholeStageCodegen after InMemoryTableScan task takes significant time and time increases based on the input size
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26691
>                 URL: https://issues.apache.org/jira/browse/SPARK-26691
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Vikash Kumar
>            Priority: Major
>         Attachments: DataScale_LkpPolicy_FirstRow_SF1_JobDetail.png, DataScale_LkpPolicy_FirstRow_SF50_JobDetail.png, WholeStageCodegen.PNG
>
>
> Scenario :  I am doing a left outer join between Sreaming dataframe and Static dataframe and writing result to kafka target. Static dataframe is created with Hive Source and Streaming dataframe is created with kafka source. And joining both the dataframe with equal condition. Here is sample program.
>  
> {code:java}
> package com.spark.exec;
> import org.apache.spark._
> import org.apache.spark.rdd._
> import org.apache.spark.storage.StorageLevel._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions.{ broadcast => infabroadcast }
> import java.io._
> import java.sql.Timestamp
> import scala.reflect.ClassTag
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.streaming._
> import org.apache.spark.sql.streaming.Trigger._
> import java.util.UUID.randomUUID
> import org.apache.spark.storage.StorageLevel
> object Spark0 {
> def main(s:Array[String]) {
> val sqlContext = SparkSession.builder().enableHiveSupport().getOrCreate()
> import sqlContext.implicits._
> import org.apache.spark.sql.functions.{stddev_samp, var_samp}
> val v1 = sqlContext.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:49092").option("subscribe", "source").load().toDF();
> val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true)))
> val stream = v1.selectExpr("cast (value as string) as json")
> .select(from_json($"json", schema=schema) as "data")
> .select("data.*")
> val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as name FROM default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF;
> val joinDF = stream.join(static, stream.col("id").equalTo(static.col("id")), "left_outer")
> val result = joinDF.selectExpr("to_json(struct(*)) AS value")
> val UUID = randomUUID().toString
> val checkpoint = "/tmp/" + UUID
> result.writeStream.format("kafka").option("kafka.bootstrap.servers", "localhost:49092")
> .option("topic", "target").options(Map(Tuple2("batch.size", "16384"), Tuple2("metadata.fetch.timeout.ms", "10000"), Tuple2("linger.ms", "1000")))
> .option("checkpointLocation", checkpoint).trigger(Trigger.ProcessingTime(20000L)).start()
> val activeStreams = sqlContext.streams.active
> activeStreams.foreach( stream => stream.awaitTermination())
> }
> }
> {code}
>  
> On the static dataframe applied repartition and persist function.
> {code:java}
> val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as name FROM default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF;{code}
> What i observed in Spark UI that WholeStageCodegen after InMemoryTableScan task takes is taking significant amount of time in every batch which degrade the performance. And time increases for large amount of dataset in Hive source (static datafrme). we have already persisted the data after reparation. What is WholeStageCodegen is doing here which is taking significant amount of time based on the hive source dataset? Is this happening as per design?
> Expectation is that when we have partitioned and persisted the data frame in memory or disk than we should just need to read the data from memory and pass it to joiner to join the data.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org