You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Amit Assudani (JIRA)" <ji...@apache.org> on 2017/07/28 20:28:00 UTC
[jira] [Updated] (SPARK-21565) aggregate query fails with watermark
on eventTime but works with watermark on timestamp column generated by
current_timestamp
[ https://issues.apache.org/jira/browse/SPARK-21565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Amit Assudani updated SPARK-21565:
----------------------------------
Description:
*Short Description: *
Aggregation query fails with eventTime as watermark column while works with newTimeStamp column generated by running SQL with current_timestamp,
*Exception:*
Caused by: java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
*Code to replicate:*
package test
import java.nio.file.{Files, Path, Paths}
import java.text.SimpleDateFormat
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession}
import scala.collection.JavaConverters._
object Test1 {
def main(args: Array[String]) {
val sparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val checkpointPath = "target/cp1"
val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath
delete(newEventsPath)
delete(Paths.get(checkpointPath).toAbsolutePath)
Files.createDirectories(newEventsPath)
val dfNewEvents= newEvents(sparkSession)
dfNewEvents.createOrReplaceTempView("dfNewEvents")
//The below works - Start
// val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds")
// dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
// val groupEvents = sparkSession.sql("select symbol,newTimeStamp, count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp")
// End
//The below doesn't work - Start
val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents ").withWatermark("eventTime","2 seconds")
dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
val groupEvents = sparkSession.sql("select symbol,eventTime, count(price) as count1 from dfNewEvents2 group by symbol,eventTime")
// - End
val query1 = groupEvents.writeStream
.outputMode("append")
.format("console")
.option("checkpointLocation", checkpointPath)
.start("./myop")
val newEventFile1=newEventsPath.resolve("eventNew1.json")
Files.write(newEventFile1, List(
"""{"symbol": "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""",
"""{"symbol": "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}"""
).toIterable.asJava)
query1.processAllAvailable()
sparkSession.streams.awaitAnyTermination(10000)
}
private def newEvents(sparkSession: SparkSession) = {
val newEvents = Paths.get("target/newEvents/").toAbsolutePath
delete(newEvents)
Files.createDirectories(newEvents)
val dfNewEvents = sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2 seconds")
dfNewEvents
}
private val eventsSchema = StructType(List(
StructField("symbol", StringType, true),
StructField("price", DoubleType, true),
StructField("eventTime", TimestampType, false)
))
private def delete(dir: Path) = {
if(Files.exists(dir)) {
Files.walk(dir).iterator().asScala.toList
.map(p => p.toFile)
.sortWith((o1, o2) => o1.compareTo(o2) > 0)
.foreach(_.delete)
}
}
}
was:
Aggregation query fails with eventTime as watermark column while works with newTimeStamp column generated by running SQL with current_timestamp,
Exception:
Caused by: java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
Code to replicate:
package test
import java.nio.file.{Files, Path, Paths}
import java.text.SimpleDateFormat
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SparkSession}
import scala.collection.JavaConverters._
object Test1 {
def main(args: Array[String]) {
val sparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val checkpointPath = "target/cp1"
val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath
delete(newEventsPath)
delete(Paths.get(checkpointPath).toAbsolutePath)
Files.createDirectories(newEventsPath)
val dfNewEvents= newEvents(sparkSession)
dfNewEvents.createOrReplaceTempView("dfNewEvents")
//The below works - Start
// val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds")
// dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
// val groupEvents = sparkSession.sql("select symbol,newTimeStamp, count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp")
// End
//The below doesn't work - Start
val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents ").withWatermark("eventTime","2 seconds")
dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
val groupEvents = sparkSession.sql("select symbol,eventTime, count(price) as count1 from dfNewEvents2 group by symbol,eventTime")
// - End
val query1 = groupEvents.writeStream
.outputMode("append")
.format("console")
.option("checkpointLocation", checkpointPath)
.start("./myop")
val newEventFile1=newEventsPath.resolve("eventNew1.json")
Files.write(newEventFile1, List(
"""{"symbol": "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""",
"""{"symbol": "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}"""
).toIterable.asJava)
query1.processAllAvailable()
sparkSession.streams.awaitAnyTermination(10000)
}
private def newEvents(sparkSession: SparkSession) = {
val newEvents = Paths.get("target/newEvents/").toAbsolutePath
delete(newEvents)
Files.createDirectories(newEvents)
val dfNewEvents = sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2 seconds")
dfNewEvents
}
private val eventsSchema = StructType(List(
StructField("symbol", StringType, true),
StructField("price", DoubleType, true),
StructField("eventTime", TimestampType, false)
))
private def delete(dir: Path) = {
if(Files.exists(dir)) {
Files.walk(dir).iterator().asScala.toList
.map(p => p.toFile)
.sortWith((o1, o2) => o1.compareTo(o2) > 0)
.foreach(_.delete)
}
}
}
> aggregate query fails with watermark on eventTime but works with watermark on timestamp column generated by current_timestamp
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-21565
> URL: https://issues.apache.org/jira/browse/SPARK-21565
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.2.0
> Reporter: Amit Assudani
>
> *Short Description: *
> Aggregation query fails with eventTime as watermark column while works with newTimeStamp column generated by running SQL with current_timestamp,
> *Exception:*
> Caused by: java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347)
> at scala.None$.get(Option.scala:345)
> at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204)
> at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
> at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
> at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
> at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> *Code to replicate:*
> package test
> import java.nio.file.{Files, Path, Paths}
> import java.text.SimpleDateFormat
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{SparkSession}
> import scala.collection.JavaConverters._
> object Test1 {
> def main(args: Array[String]) {
> val sparkSession = SparkSession
> .builder()
> .master("local[*]")
> .appName("Spark SQL basic example")
> .config("spark.some.config.option", "some-value")
> .getOrCreate()
> val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
> val checkpointPath = "target/cp1"
> val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath
> delete(newEventsPath)
> delete(Paths.get(checkpointPath).toAbsolutePath)
> Files.createDirectories(newEventsPath)
> val dfNewEvents= newEvents(sparkSession)
> dfNewEvents.createOrReplaceTempView("dfNewEvents")
> //The below works - Start
> // val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds")
> // dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
> // val groupEvents = sparkSession.sql("select symbol,newTimeStamp, count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp")
> // End
>
>
> //The below doesn't work - Start
> val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents ").withWatermark("eventTime","2 seconds")
> dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
> val groupEvents = sparkSession.sql("select symbol,eventTime, count(price) as count1 from dfNewEvents2 group by symbol,eventTime")
> // - End
>
>
> val query1 = groupEvents.writeStream
> .outputMode("append")
> .format("console")
> .option("checkpointLocation", checkpointPath)
> .start("./myop")
> val newEventFile1=newEventsPath.resolve("eventNew1.json")
> Files.write(newEventFile1, List(
> """{"symbol": "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""",
> """{"symbol": "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}"""
> ).toIterable.asJava)
> query1.processAllAvailable()
> sparkSession.streams.awaitAnyTermination(10000)
> }
> private def newEvents(sparkSession: SparkSession) = {
> val newEvents = Paths.get("target/newEvents/").toAbsolutePath
> delete(newEvents)
> Files.createDirectories(newEvents)
> val dfNewEvents = sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2 seconds")
> dfNewEvents
> }
> private val eventsSchema = StructType(List(
> StructField("symbol", StringType, true),
> StructField("price", DoubleType, true),
> StructField("eventTime", TimestampType, false)
> ))
> private def delete(dir: Path) = {
> if(Files.exists(dir)) {
> Files.walk(dir).iterator().asScala.toList
> .map(p => p.toFile)
> .sortWith((o1, o2) => o1.compareTo(o2) > 0)
> .foreach(_.delete)
> }
> }
> }
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org