You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shixiong Zhu (JIRA)" <ji...@apache.org> on 2017/08/02 18:13:00 UTC
[jira] [Commented] (SPARK-21565) aggregate query fails with
watermark on eventTime but works with watermark on timestamp column
generated by current_timestamp
[ https://issues.apache.org/jira/browse/SPARK-21565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111451#comment-16111451 ]
Shixiong Zhu commented on SPARK-21565:
--------------------------------------
Thanks for reporting it. I can reproduce the error in a unit test. Still investigating it.
> aggregate query fails with watermark on eventTime but works with watermark on timestamp column generated by current_timestamp
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-21565
> URL: https://issues.apache.org/jira/browse/SPARK-21565
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.2.0
> Reporter: Amit Assudani
>
> *Short Description: *
> Aggregation query fails with eventTime as watermark column while works with newTimeStamp column generated by running SQL with current_timestamp,
> *Exception:*
> {code}
> Caused by: java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347)
> at scala.None$.get(Option.scala:345)
> at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204)
> at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
> at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
> at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
> at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> {code}
> *Code to replicate:*
> {code}
> package test
> import java.nio.file.{Files, Path, Paths}
> import java.text.SimpleDateFormat
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{SparkSession}
> import scala.collection.JavaConverters._
> object Test1 {
> def main(args: Array[String]) {
> val sparkSession = SparkSession
> .builder()
> .master("local[*]")
> .appName("Spark SQL basic example")
> .config("spark.some.config.option", "some-value")
> .getOrCreate()
> val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
> val checkpointPath = "target/cp1"
> val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath
> delete(newEventsPath)
> delete(Paths.get(checkpointPath).toAbsolutePath)
> Files.createDirectories(newEventsPath)
> val dfNewEvents= newEvents(sparkSession)
> dfNewEvents.createOrReplaceTempView("dfNewEvents")
> //The below works - Start
> // val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds")
> // dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
> // val groupEvents = sparkSession.sql("select symbol,newTimeStamp, count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp")
> // End
>
>
> //The below doesn't work - Start
> val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents ").withWatermark("eventTime","2 seconds")
> dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
> val groupEvents = sparkSession.sql("select symbol,eventTime, count(price) as count1 from dfNewEvents2 group by symbol,eventTime")
> // - End
>
>
> val query1 = groupEvents.writeStream
> .outputMode("append")
> .format("console")
> .option("checkpointLocation", checkpointPath)
> .start("./myop")
> val newEventFile1=newEventsPath.resolve("eventNew1.json")
> Files.write(newEventFile1, List(
> """{"symbol": "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""",
> """{"symbol": "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}"""
> ).toIterable.asJava)
> query1.processAllAvailable()
> sparkSession.streams.awaitAnyTermination(10000)
> }
> private def newEvents(sparkSession: SparkSession) = {
> val newEvents = Paths.get("target/newEvents/").toAbsolutePath
> delete(newEvents)
> Files.createDirectories(newEvents)
> val dfNewEvents = sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2 seconds")
> dfNewEvents
> }
> private val eventsSchema = StructType(List(
> StructField("symbol", StringType, true),
> StructField("price", DoubleType, true),
> StructField("eventTime", TimestampType, false)
> ))
> private def delete(dir: Path) = {
> if(Files.exists(dir)) {
> Files.walk(dir).iterator().asScala.toList
> .map(p => p.toFile)
> .sortWith((o1, o2) => o1.compareTo(o2) > 0)
> .foreach(_.delete)
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org