You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Amit Assudani (JIRA)" <ji...@apache.org> on 2017/07/28 20:28:00 UTC

[jira] [Updated] (SPARK-21565) aggregate query fails with watermark on eventTime but works with watermark on timestamp column generated by current_timestamp

     [ https://issues.apache.org/jira/browse/SPARK-21565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Amit Assudani updated SPARK-21565:
----------------------------------
    Description: 
*Short Description: *

Aggregation query fails with eventTime as watermark column while works with newTimeStamp column generated by running SQL with current_timestamp,

*Exception:*

Caused by: java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:347)
	at scala.None$.get(Option.scala:345)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

*Code to replicate:*

package test

import java.nio.file.{Files, Path, Paths}
import java.text.SimpleDateFormat

import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession}

import scala.collection.JavaConverters._

object Test1 {

  def main(args: Array[String]) {

    val sparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val checkpointPath = "target/cp1"
    val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath
    delete(newEventsPath)
    delete(Paths.get(checkpointPath).toAbsolutePath)
    Files.createDirectories(newEventsPath)


    val dfNewEvents= newEvents(sparkSession)
    dfNewEvents.createOrReplaceTempView("dfNewEvents")

    //The below works - Start
//    val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds")
//    dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
//    val groupEvents = sparkSession.sql("select symbol,newTimeStamp, count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp")
    // End
    
    
    //The below doesn't work - Start
    val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents ").withWatermark("eventTime","2 seconds")
     dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
      val groupEvents = sparkSession.sql("select symbol,eventTime, count(price) as count1 from dfNewEvents2 group by symbol,eventTime")
    // - End
    
    
    val query1 = groupEvents.writeStream
      .outputMode("append")
        .format("console")
      .option("checkpointLocation", checkpointPath)
      .start("./myop")

    val newEventFile1=newEventsPath.resolve("eventNew1.json")
    Files.write(newEventFile1, List(
      """{"symbol": "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""",
      """{"symbol": "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}"""
    ).toIterable.asJava)
    query1.processAllAvailable()

    sparkSession.streams.awaitAnyTermination(10000)

  }

  private def newEvents(sparkSession: SparkSession) = {
    val newEvents = Paths.get("target/newEvents/").toAbsolutePath
    delete(newEvents)
    Files.createDirectories(newEvents)

    val dfNewEvents = sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2 seconds")
    dfNewEvents
  }

  private val eventsSchema = StructType(List(
    StructField("symbol", StringType, true),
    StructField("price", DoubleType, true),
    StructField("eventTime", TimestampType, false)
  ))

  private def delete(dir: Path) = {
    if(Files.exists(dir)) {
      Files.walk(dir).iterator().asScala.toList
        .map(p => p.toFile)
        .sortWith((o1, o2) => o1.compareTo(o2) > 0)
        .foreach(_.delete)
    }
  }

}




  was:
Aggregation query fails with eventTime as watermark column while works with newTimeStamp column generated by running SQL with current_timestamp,

Exception:

Caused by: java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:347)
	at scala.None$.get(Option.scala:345)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

Code to replicate:

package test

import java.nio.file.{Files, Path, Paths}
import java.text.SimpleDateFormat

import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession}

import scala.collection.JavaConverters._

object Test1 {

  def main(args: Array[String]) {

    val sparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val checkpointPath = "target/cp1"
    val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath
    delete(newEventsPath)
    delete(Paths.get(checkpointPath).toAbsolutePath)
    Files.createDirectories(newEventsPath)


    val dfNewEvents= newEvents(sparkSession)
    dfNewEvents.createOrReplaceTempView("dfNewEvents")

    //The below works - Start
//    val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds")
//    dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
//    val groupEvents = sparkSession.sql("select symbol,newTimeStamp, count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp")
    // End
    
    
    //The below doesn't work - Start
    val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents ").withWatermark("eventTime","2 seconds")
     dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
      val groupEvents = sparkSession.sql("select symbol,eventTime, count(price) as count1 from dfNewEvents2 group by symbol,eventTime")
    // - End
    
    
    val query1 = groupEvents.writeStream
      .outputMode("append")
        .format("console")
      .option("checkpointLocation", checkpointPath)
      .start("./myop")

    val newEventFile1=newEventsPath.resolve("eventNew1.json")
    Files.write(newEventFile1, List(
      """{"symbol": "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""",
      """{"symbol": "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}"""
    ).toIterable.asJava)
    query1.processAllAvailable()

    sparkSession.streams.awaitAnyTermination(10000)

  }

  private def newEvents(sparkSession: SparkSession) = {
    val newEvents = Paths.get("target/newEvents/").toAbsolutePath
    delete(newEvents)
    Files.createDirectories(newEvents)

    val dfNewEvents = sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2 seconds")
    dfNewEvents
  }

  private val eventsSchema = StructType(List(
    StructField("symbol", StringType, true),
    StructField("price", DoubleType, true),
    StructField("eventTime", TimestampType, false)
  ))

  private def delete(dir: Path) = {
    if(Files.exists(dir)) {
      Files.walk(dir).iterator().asScala.toList
        .map(p => p.toFile)
        .sortWith((o1, o2) => o1.compareTo(o2) > 0)
        .foreach(_.delete)
    }
  }

}





> aggregate query fails with watermark on eventTime but works with watermark on timestamp column generated by current_timestamp
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21565
>                 URL: https://issues.apache.org/jira/browse/SPARK-21565
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Amit Assudani
>
> *Short Description: *
> Aggregation query fails with eventTime as watermark column while works with newTimeStamp column generated by running SQL with current_timestamp,
> *Exception:*
> Caused by: java.util.NoSuchElementException: None.get
> 	at scala.None$.get(Option.scala:347)
> 	at scala.None$.get(Option.scala:345)
> 	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204)
> 	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
> 	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
> 	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
> 	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> *Code to replicate:*
> package test
> import java.nio.file.{Files, Path, Paths}
> import java.text.SimpleDateFormat
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{SparkSession}
> import scala.collection.JavaConverters._
> object Test1 {
>   def main(args: Array[String]) {
>     val sparkSession = SparkSession
>       .builder()
>       .master("local[*]")
>       .appName("Spark SQL basic example")
>       .config("spark.some.config.option", "some-value")
>       .getOrCreate()
>     val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
>     val checkpointPath = "target/cp1"
>     val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath
>     delete(newEventsPath)
>     delete(Paths.get(checkpointPath).toAbsolutePath)
>     Files.createDirectories(newEventsPath)
>     val dfNewEvents= newEvents(sparkSession)
>     dfNewEvents.createOrReplaceTempView("dfNewEvents")
>     //The below works - Start
> //    val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds")
> //    dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
> //    val groupEvents = sparkSession.sql("select symbol,newTimeStamp, count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp")
>     // End
>     
>     
>     //The below doesn't work - Start
>     val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents ").withWatermark("eventTime","2 seconds")
>      dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
>       val groupEvents = sparkSession.sql("select symbol,eventTime, count(price) as count1 from dfNewEvents2 group by symbol,eventTime")
>     // - End
>     
>     
>     val query1 = groupEvents.writeStream
>       .outputMode("append")
>         .format("console")
>       .option("checkpointLocation", checkpointPath)
>       .start("./myop")
>     val newEventFile1=newEventsPath.resolve("eventNew1.json")
>     Files.write(newEventFile1, List(
>       """{"symbol": "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""",
>       """{"symbol": "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}"""
>     ).toIterable.asJava)
>     query1.processAllAvailable()
>     sparkSession.streams.awaitAnyTermination(10000)
>   }
>   private def newEvents(sparkSession: SparkSession) = {
>     val newEvents = Paths.get("target/newEvents/").toAbsolutePath
>     delete(newEvents)
>     Files.createDirectories(newEvents)
>     val dfNewEvents = sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2 seconds")
>     dfNewEvents
>   }
>   private val eventsSchema = StructType(List(
>     StructField("symbol", StringType, true),
>     StructField("price", DoubleType, true),
>     StructField("eventTime", TimestampType, false)
>   ))
>   private def delete(dir: Path) = {
>     if(Files.exists(dir)) {
>       Files.walk(dir).iterator().asScala.toList
>         .map(p => p.toFile)
>         .sortWith((o1, o2) => o1.compareTo(o2) > 0)
>         .foreach(_.delete)
>     }
>   }
> }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org