You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kamalanathan Venkatesan <Ka...@in.ey.com> on 2019/07/09 13:54:48 UTC

Spark structural streaming sinks output late

Hello,

I have below spark structural streaming code and I was expecting the results to be printed on the console every 10 seconds. But, I notice the sink to console happening every ~2 mins and above.
What could be the issue

def streaming(): Unit = {
    System.setProperty("hadoop.home.dir", "/Documents/ ")
    val conf: SparkConf = new SparkConf().setAppName("Histogram").setMaster("local[8]")
    conf.set("spark.eventLog.enabled", "false");
    val sc: SparkContext = new SparkContext(conf)
    val sqlcontext = new SQLContext(sc)
    val spark = SparkSession.builder().config(conf).getOrCreate()

    import sqlcontext.implicits._
    import org.apache.spark.sql.functions.window

    val inputDf = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "wonderful")
      .option("startingOffsets", "latest")
      .load()
    import scala.concurrent.duration._

    val personJsonDf = inputDf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
      .withWatermark("timestamp", "500 milliseconds")
      .groupBy(
        window($"timestamp", "10 seconds")).count()

    val consoleOutput = personJsonDf.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate", "false")
      .outputMode(OutputMode.Update())
      .start()
    consoleOutput.awaitTermination()
  }

object SparkExecutor {
  val spE: SparkExecutor = new SparkExecutor();
  def main(args: Array[String]): Unit = {
    println("test")
    spE.streaming
  }
}

The information contained in this communication is intended solely for the use of the individual or entity to whom it is addressed and others authorized to receive it. It may contain confidential or legally privileged information. If you are not the intended recipient you are hereby notified that any disclosure, copying, distribution or taking any action in reliance on the contents of this information is strictly prohibited and may be unlawful. If you have received this communication in error, please notify us immediately by responding to this email and then delete it from your system. The firm is neither liable for the proper and complete transmission of the information contained in this communication nor for any delay in its receipt.

Re: Spark structural streaming sinks output late

Posted by Siva Samraj <sa...@gmail.com>.
Yes, I am also facing the same issue. Did you figured out?

On Tue, 9 Jul 2019, 7:25 pm Kamalanathan Venkatesan, <
Kamalanathan.V@in.ey.com> wrote:

> Hello,
>
>
>
> I have below spark structural streaming code and I was expecting the
> results to be printed on the console every 10 seconds. But, I notice the
> sink to console happening every ~2 mins and above.
>
> What could be the issue
>
>
>
> *def* streaming(): Unit = {
>
>     System.setProperty("hadoop.home.dir", "/Documents/ ")
>
>     *val* conf: SparkConf = *new* SparkConf().setAppName("Histogram").
> setMaster("local[8]")
>
>     conf.set("spark.eventLog.enabled", "false");
>
>     *val* sc: SparkContext = *new* SparkContext(conf)
>
>     *val* sqlcontext = *new* SQLContext(sc)
>
>     *val* spark = SparkSession.builder().config(conf).getOrCreate()
>
>
>
>     *import* sqlcontext.implicits._
>
>     *import* org.apache.spark.sql.functions.window
>
>
>
>     *val* inputDf = spark.readStream.format("kafka")
>
>       .option("kafka.bootstrap.servers", "localhost:9092")
>
>       .option("subscribe", "wonderful")
>
>       .option("startingOffsets", "latest")
>
>       .load()
>
>     *import* scala.concurrent.duration._
>
>
>
>     *val* personJsonDf = inputDf.selectExpr("CAST(key AS STRING)", "CAST(value
> AS STRING)", "timestamp")
>
>       .withWatermark("timestamp", "500 milliseconds")
>
>       .groupBy(
>
>         window(*$**"timestamp"*, "10 seconds")).count()
>
>
>
>     *val* consoleOutput = personJsonDf.writeStream
>
>       .outputMode("complete")
>
>       .format("console")
>
>       .option("truncate", "false")
>
>       .outputMode(OutputMode.Update())
>
>       .start()
>
>     consoleOutput.awaitTermination()
>
>   }
>
>
>
> *object* SparkExecutor {
>
>   *val* spE: SparkExecutor = *new* SparkExecutor();
>
>   *def* main(args: Array[*String*]): Unit = {
>
>     println("test")
>
>     spE.streaming
>
>   }
>
> }
>
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you have received this communication in error, please notify
> us immediately by responding to this email and then delete it from your
> system. The firm is neither liable for the proper and complete transmission
> of the information contained in this communication nor for any delay in its
> receipt.
>