You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Biplob Biswas <re...@gmail.com> on 2017/06/06 10:03:12 UTC

[Spark Structured Streaming] Exception while using watermark with type of timestamp

Hi,

I am playing around with Spark structured streaming and we have a use case
to use this as a CEP engine. 

I am reading from 3 different kafka topics together. I want to perform
windowing on this structured stream and then run some queries on this block
on a sliding scale. Also, all of this needs to happen on the event time and
I have my corresponding timestamp attached to each event. 

Now I have something like this 

/val windowedEvents = filteredBamEvents.withWatermark("timestamp", "10
minutes")
                                         
.groupBy(functions.window($"timestamp", "10 minutes", "5 minutes"))/

where timestamp is of type long in my case class as follows:

/case class BAMIngestedEvent(id: String, eventName: String, eventID: String,
correlationID: Seq[String], timestamp: Long)/

But when I am running this example with my data from kafka I cam getting
this following exception:



Exception in thread "main" org.apache.spark.sql.AnalysisException: Event
time must be defined on a window or a timestamp, but timestamp is of type
bigint;;
EventTimeWatermark timestamp#36: bigint, interval 10 minutes
+- TypedFilter <function1>, class
com.airplus.poc.edl.model.BAMIngestedEvent,
[StructField(id,StringType,true), StructField(eventName,StringType,true),
StructField(eventID,StringType,true),
StructField(correlationID,ArrayType(StringType,true),true),
StructField(timestamp,LongType,false)], newInstance(class
com.airplus.poc.edl.model.BAMIngestedEvent)
   +- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent,
true]).id, true) AS id#32, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent,
true]).eventName, true) AS eventName#33, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent,
true]).eventID, true) AS eventID#34, mapobjects(MapObjects_loopValue0,
MapObjects_loopIsNull1, ObjectType(class java.lang.String),
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1,
ObjectType(class java.lang.String)), true), assertnotnull(input[0,
com.airplus.poc.edl.model.BAMIngestedEvent, true]).correlationID) AS
correlationID#35, assertnotnull(input[0,
com.airplus.poc.edl.model.BAMIngestedEvent, true]).timestamp AS
timestamp#36L]
      +- MapElements <function1>, interface org.apache.spark.sql.Row,
[StructField(value,StringType,true)], obj#31:
com.airplus.poc.edl.model.BAMIngestedEvent
         +- DeserializeToObject createexternalrow(value#16.toString,
StructField(value,StringType,true)), obj#30: org.apache.spark.sql.Row
            +- Project [value#16]
               +- Project [cast(key#0 as string) AS key#15, cast(value#1 as
string) AS value#16]
                  +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@222acad,kafka,List(),None,List(),None,Map(startingOffsets
-> latest, subscribe -> iom, edl, kafka.bootstrap.servers ->
airpluspoc-hdp-dn0.germanycentral.cloudapp.microsoftazure.de:9092,airpluspoc-hdp-dn1.germanycentral.cloudapp.microsoftazure.de:9092,airpluspoc-hdp-dn2.germanycentral.cloudapp.microsoftazure.de:9092),None),
kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5,
timestampType#6]

	at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
	at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
	at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:204)
	at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:76)
	at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
	at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:76)
	at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
	at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:161)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
	at org.apache.spark.sql.Dataset$.apply(Dataset.scala:58)
	at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2850)
	at org.apache.spark.sql.Dataset.withWatermark(Dataset.scala:571)
	at com.airplus.poc.edl.CEPForBAM$.main(CEPForBAM.scala:47)
	at com.airplus.poc.edl.CEPForBAM.main(CEPForBAM.scala)






--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Structured-Streaming-Exception-while-using-watermark-with-type-of-timestamp-tp28745.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org